You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/10/17 03:18:12 UTC
[camel] branch master updated: CAMEL-14073 - camel-hdfs - Cleanup
HA/Cluster related classes (#3254)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 3032641 CAMEL-14073 - camel-hdfs - Cleanup HA/Cluster related classes (#3254)
3032641 is described below
commit 3032641514838fcab5e5a37ee01dfa82a9f59cb7
Author: Marius Cornescu <ma...@yahoo.com>
AuthorDate: Thu Oct 17 05:17:57 2019 +0200
CAMEL-14073 - camel-hdfs - Cleanup HA/Cluster related classes (#3254)
* CAMEL-14073 - camel-hdfs - Cleanup HA/Cluster related classes
* CAMEL-14073 : camel-hdfs - Cleanup HA/Cluster related classes
* Update hdfs-component.adoc
* Update hdfs-component.adoc
---
components/camel-hdfs/pom.xml | 5 ++
.../camel-hdfs/src/main/docs/hdfs-component.adoc | 18 ++++++
.../component/hdfs/HaConfigurationBuilder.java | 52 ++++++++++++---
.../camel/component/hdfs/HdfsArrayFileType.java | 4 +-
.../camel/component/hdfs/HdfsBloommapFileType.java | 4 +-
.../camel/component/hdfs/HdfsConfiguration.java | 19 +++++-
.../apache/camel/component/hdfs/HdfsConsumer.java | 21 +++---
.../org/apache/camel/component/hdfs/HdfsInfo.java | 56 +++-------------
.../camel/component/hdfs/HdfsInfoFactory.java | 63 ++++++++++++++++--
.../camel/component/hdfs/HdfsInputStream.java | 31 ++++-----
.../camel/component/hdfs/HdfsMapFileType.java | 4 +-
.../camel/component/hdfs/HdfsOsgiHelper.java | 8 +--
.../camel/component/hdfs/HdfsOutputStream.java | 46 ++++++-------
.../apache/camel/component/hdfs/HdfsProducer.java | 75 ++++++++++------------
.../camel/component/hdfs/HdfsSequenceFileType.java | 4 +-
.../kerberos/KerberosConfigurationBuilder.java | 7 +-
.../camel/component/hdfs/FromFileToHdfsTest.java | 8 +--
.../component/hdfs/HaConfigurationBuilderTest.java | 41 ++++++++++--
.../camel/component/hdfs/HdfsConsumerTest.java | 36 +++++------
.../apache/camel/component/hdfs/HdfsInfoTest.java} | 36 +++++++----
.../component/hdfs/HdfsProducerConsumerTest.java | 6 +-
.../component/hdfs/HdfsProducerSplitTest.java | 8 +--
.../camel/component/hdfs/HdfsProducerTest.java | 34 +++++-----
.../camel/component/hdfs/HdfsTestSupport.java | 44 +++++++++++--
.../hdfs/kerberos/KerberosAuthenticationTest.java | 11 ++--
.../kerberos/KerberosConfigurationBuilderTest.java | 16 ++---
26 files changed, 394 insertions(+), 263 deletions(-)
diff --git a/components/camel-hdfs/pom.xml b/components/camel-hdfs/pom.xml
index b7bbc17e..d2c9532 100644
--- a/components/camel-hdfs/pom.xml
+++ b/components/camel-hdfs/pom.xml
@@ -139,6 +139,11 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<scope>test</scope>
diff --git a/components/camel-hdfs/src/main/docs/hdfs-component.adoc b/components/camel-hdfs/src/main/docs/hdfs-component.adoc
index 423f915..ae3703c 100644
--- a/components/camel-hdfs/src/main/docs/hdfs-component.adoc
+++ b/components/camel-hdfs/src/main/docs/hdfs-component.adoc
@@ -327,6 +327,24 @@ resource with bundle that contains blueprint definition.
This way Hadoop 2.x will have correct mapping of URI schemes to
filesystem implementations.
+=== Using this component with a HighAvailability configuration
+
+In a HA setup, there will be multiple nodes (_configured through the *namedNodes* parameter_).
+The "hostname" and "port" portion of the endpoint uri will no longer have a _"host"_ meaning, but it will represent the name given to the cluster.
+
+You can choose whatever name you want for the cluster (_the name should follow the [a-zA-Z0-9] convention_).
+This name will be sanitized by replacing the _dirty_ characters with underscore. This is done so that a host name or ip could pottentialy be used, if it makes sense to you.
+
+The cluster name will be mapped to the HA filesystem with a coresponding proxy, with failover, and the _works_.
+
+[source,java]
+------------------------------------------------------------------------------------------------------
+
+from("hdfs://node1_and_2_cluster/dir1/dir2?namedNodes=node1.exemple.org:8020,node2.exemple.org:8020").routeId(...)
+...
+------------------------------------------------------------------------------------------------------
+
+
=== Using this component with Kerberos authentication
The kerberos config file is read when the camel component is created, not when the endpoint is created.
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HaConfigurationBuilder.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HaConfigurationBuilder.java
index 1d432b3..ca5daad 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HaConfigurationBuilder.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HaConfigurationBuilder.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.hdfs;
import java.util.List;
import java.util.stream.Collectors;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
@@ -31,6 +32,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
final class HaConfigurationBuilder {
private static final String HFDS_NAMED_SERVICE = "hfdsNamedService";
+ private static final String HFDS_NAMED_SERVICE_SEPARATOR = "_";
private static final String HFDS_FS = "fs.defaultFS";
private HaConfigurationBuilder() {
@@ -49,31 +51,63 @@ final class HaConfigurationBuilder {
* configuration.set("dfs.client.failover.proxy.provider.hfdsNamedService", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
* <p>
*
- * @param namedNodes - All named nodes from the hadoop cluster
- * @param replicationFactor - dfs replication factor
+ * @param configuration - hdfs configuration that will be setup with the HA settings
+ * @param endpointConfig - configuration with the HA settings configured on the endpoint
*/
- static void withClusterConfiguration(Configuration configuration, List<String> namedNodes, int replicationFactor) {
+ static void withClusterConfiguration(Configuration configuration, HdfsConfiguration endpointConfig) {
+ String haNamedService = getSanitizedClusterName(endpointConfig.getHostName());
+ withClusterConfiguration(configuration, haNamedService, endpointConfig.getNamedNodeList(), endpointConfig.getReplication());
+ }
+
+ /**
+ * Generates the correct HA configuration (normally read from xml) based on the namedNodes:
+ * All named nodes have to be qualified: configuration.set("dfs.ha.namenodes.hfdsNamedService","namenode1,namenode2");
+ * For each named node the following entries is added
+ * <p>
+ * configuration.set("dfs.namenode.rpc-address.hfdsNamedService.namenode1", "namenode1:1234");
+ * <p>
+ * Finally the proxy provider has to be specified:
+ * <p>
+ * configuration.set("dfs.client.failover.proxy.provider.hfdsNamedService", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
+ * <p>
+ *
+ * @param configuration - hdfs configuration that will be setup with the HA settings
+ * @param haNamedService - how the ha named service that represents the cluster will be named (used to resolve the FS)
+ * @param namedNodes - All named nodes from the hadoop cluster
+ * @param replicationFactor - dfs replication factor
+ */
+ static void withClusterConfiguration(Configuration configuration, String haNamedService, List<String> namedNodes, int replicationFactor) {
configuration.set(DFSConfigKeys.DFS_REPLICATION_KEY, Integer.toString(replicationFactor));
- configuration.set(DFSConfigKeys.DFS_NAMESERVICES, HFDS_NAMED_SERVICE);
+ configuration.set(DFSConfigKeys.DFS_NAMESERVICES, haNamedService);
configuration.set(
- DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, HFDS_NAMED_SERVICE),
+ DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, haNamedService),
nodeToString(namedNodes.stream().map(HaConfigurationBuilder::nodeToString).collect(Collectors.joining(",")))
);
namedNodes.forEach(nodeName ->
configuration.set(
- DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, HFDS_NAMED_SERVICE, nodeToString(nodeName)),
+ DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, haNamedService, nodeToString(nodeName)),
nodeName)
);
- configuration.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + HFDS_NAMED_SERVICE, ConfiguredFailoverProxyProvider.class.getName());
+ configuration.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + haNamedService, ConfiguredFailoverProxyProvider.class.getName());
+
+ configuration.set(HFDS_FS, "hdfs://" + haNamedService);
+
+ }
+
+ static String getSanitizedClusterName(String rawClusterName) {
+ String clusterName = HFDS_NAMED_SERVICE;
- configuration.set(HFDS_FS, "hdfs://" + HFDS_NAMED_SERVICE);
+ if (StringUtils.isNotEmpty(rawClusterName)) {
+ clusterName = rawClusterName.replaceAll("\\.", HFDS_NAMED_SERVICE_SEPARATOR);
+ }
+ return clusterName;
}
private static String nodeToString(String nodeName) {
- return nodeName.replaceAll(":[0-9]*", "").replaceAll("\\.", "_");
+ return nodeName.replaceAll(":[0-9]*", "").replaceAll("\\.", HFDS_NAMED_SERVICE_SEPARATOR);
}
}
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileType.java
index 4c4123c..8b5aa8d 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileType.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileType.java
@@ -65,7 +65,7 @@ class HdfsArrayFileType extends DefaultHdfsFileType {
Closeable rout;
HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration);
Class<? extends WritableComparable> valueWritableClass = configuration.getValueType().getWritableClass();
- rout = new ArrayFile.Writer(hdfsInfo.getConf(), hdfsInfo.getFileSystem(), hdfsPath, valueWritableClass,
+ rout = new ArrayFile.Writer(hdfsInfo.getConfiguration(), hdfsInfo.getFileSystem(), hdfsPath, valueWritableClass,
configuration.getCompressionType(), () -> { });
return rout;
} catch (IOException ex) {
@@ -78,7 +78,7 @@ class HdfsArrayFileType extends DefaultHdfsFileType {
try {
Closeable rin;
HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration);
- rin = new ArrayFile.Reader(hdfsInfo.getFileSystem(), hdfsPath, hdfsInfo.getConf());
+ rin = new ArrayFile.Reader(hdfsInfo.getFileSystem(), hdfsPath, hdfsInfo.getConfiguration());
return rin;
} catch (IOException ex) {
throw new RuntimeCamelException(ex);
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileType.java
index be4c6d3..fa0c0ab 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileType.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileType.java
@@ -73,7 +73,7 @@ class HdfsBloommapFileType extends DefaultHdfsFileType {
HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration);
Class<? extends WritableComparable> keyWritableClass = configuration.getKeyType().getWritableClass();
Class<? extends WritableComparable> valueWritableClass = configuration.getValueType().getWritableClass();
- rout = new BloomMapFile.Writer(hdfsInfo.getConf(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass),
+ rout = new BloomMapFile.Writer(hdfsInfo.getConfiguration(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass),
MapFile.Writer.valueClass(valueWritableClass),
MapFile.Writer.compression(configuration.getCompressionType(), configuration.getCompressionCodec().getCodec()),
MapFile.Writer.progressable(() -> {
@@ -89,7 +89,7 @@ class HdfsBloommapFileType extends DefaultHdfsFileType {
try {
Closeable rin;
HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration);
- rin = new BloomMapFile.Reader(new Path(hdfsPath), hdfsInfo.getConf());
+ rin = new BloomMapFile.Reader(new Path(hdfsPath), hdfsInfo.getConfiguration());
return rin;
} catch (IOException ex) {
throw new RuntimeCamelException(ex);
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java
index be5bdb3..c5d3dc4 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java
@@ -98,6 +98,7 @@ public class HdfsConfiguration {
private String kerberosKeytabLocation;
public HdfsConfiguration() {
+ // default constructor
}
private Boolean getBoolean(Map<String, Object> hdfsSettings, String param, Boolean dflt) {
@@ -207,7 +208,7 @@ public class HdfsConfiguration {
private List<String> getNamedNodeList(Map<String, Object> hdfsSettings) {
namedNodes = getString(hdfsSettings, "namedNodes", namedNodes);
-
+
if (isNotEmpty(namedNodes)) {
return Arrays.stream(namedNodes.split(",")).distinct().collect(Collectors.toList());
}
@@ -560,6 +561,10 @@ public class HdfsConfiguration {
return namedNodeList;
}
+ public boolean hasClusterConfiguration() {
+ return !getNamedNodeList().isEmpty();
+ }
+
public String getKerberosConfigFileLocation() {
return kerberosConfigFileLocation;
}
@@ -595,7 +600,17 @@ public class HdfsConfiguration {
}
public boolean isKerberosAuthentication() {
- return isNotEmpty(namedNodes) && isNotEmpty(kerberosConfigFileLocation) && isNotEmpty(kerberosUsername) && isNotEmpty(kerberosKeytabLocation);
+ return isNotEmpty(kerberosConfigFileLocation) && isNotEmpty(kerberosUsername) && isNotEmpty(kerberosKeytabLocation);
+ }
+
+ /**
+ * Get the label of the hdfs file system like: HOST_NAME:PORT/PATH
+ *
+ * @param path
+ * @return HOST_NAME:PORT/PATH
+ */
+ String getFileSystemLabel(String path) {
+ return String.format("%s:%s/%s", getHostName(), getPort(), path);
}
}
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
index 9971f74..52b1249 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
@@ -38,12 +38,10 @@ import org.apache.hadoop.fs.PathFilter;
public final class HdfsConsumer extends ScheduledPollConsumer {
- public static final long DEFAULT_CONSUMER_INITIAL_DELAY = 10 * 1000L;
-
private final HdfsConfiguration config;
private final StringBuilder hdfsPath;
private final Processor processor;
- private final ReadWriteLock rwlock = new ReentrantReadWriteLock();
+ private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
public HdfsConsumer(HdfsEndpoint endpoint, Processor processor, HdfsConfiguration config) {
super(endpoint, processor);
@@ -69,24 +67,21 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
}
private HdfsInfo setupHdfs(boolean onStartup) throws IOException {
+ String hdfsFsDescription = config.getFileSystemLabel(hdfsPath.toString());
// if we are starting up then log at info level, and if runtime then log at debug level to not flood the log
if (onStartup) {
- log.info("Connecting to hdfs file-system {}:{}/{} (may take a while if connection is not available)", config.getHostName(), config.getPort(), hdfsPath);
+ log.info("Connecting to hdfs file-system {} (may take a while if connection is not available)", hdfsFsDescription);
} else {
- if (log.isDebugEnabled()) {
- log.debug("Connecting to hdfs file-system {}:{}/{} (may take a while if connection is not available)", config.getHostName(), config.getPort(), hdfsPath);
- }
+ log.debug("Connecting to hdfs file-system {} (may take a while if connection is not available)", hdfsFsDescription);
}
// hadoop will cache the connection by default so its faster to get in the poll method
HdfsInfo answer = HdfsInfoFactory.newHdfsInfo(this.hdfsPath.toString(), config);
if (onStartup) {
- log.info("Connected to hdfs file-system {}:{}/{}", config.getHostName(), config.getPort(), hdfsPath);
+ log.info("Connected to hdfs file-system {}", hdfsFsDescription);
} else {
- if (log.isDebugEnabled()) {
- log.debug("Connected to hdfs file-system {}:{}/{}", config.getHostName(), config.getPort(), hdfsPath);
- }
+ log.debug("Connected to hdfs file-system {}", hdfsFsDescription);
}
return answer;
}
@@ -199,11 +194,11 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
private HdfsInputStream createInputStream(FileStatus fileStatus) {
try {
- this.rwlock.writeLock().lock();
+ this.rwLock.writeLock().lock();
return HdfsInputStream.createInputStream(fileStatus.getPath().toString(), this.config);
} finally {
- this.rwlock.writeLock().unlock();
+ this.rwLock.writeLock().unlock();
}
}
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java
index bd9f245..ff8035e 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java
@@ -16,29 +16,23 @@
*/
package org.apache.camel.component.hdfs;
-import java.io.IOException;
-import java.net.URI;
-import java.util.List;
-
-import org.apache.camel.component.hdfs.kerberos.KerberosAuthentication;
-import org.apache.camel.component.hdfs.kerberos.KerberosConfigurationBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-public final class HdfsInfo {
+final class HdfsInfo {
- private Configuration configuration;
- private FileSystem fileSystem;
- private Path path;
+ private final Configuration configuration;
+ private final FileSystem fileSystem;
+ private final Path path;
- HdfsInfo(String hdfsPath, HdfsConfiguration endpointConfig) throws IOException {
- this.configuration = newConfiguration(endpointConfig);
- this.fileSystem = newFileSystem(this.configuration, hdfsPath, endpointConfig);
- this.path = new Path(hdfsPath);
+ HdfsInfo(Configuration configuration, FileSystem fileSystem, Path hdfsPath) {
+ this.configuration = configuration;
+ this.fileSystem = fileSystem;
+ this.path = hdfsPath;
}
- public Configuration getConf() {
+ public Configuration getConfiguration() {
return configuration;
}
@@ -50,36 +44,4 @@ public final class HdfsInfo {
return path;
}
- private static Configuration newConfiguration(HdfsConfiguration endpointConfig) {
- Configuration configuration = new Configuration();
-
- if (endpointConfig.isKerberosAuthentication()) {
- String kerberosConfigFileLocation = endpointConfig.getKerberosConfigFileLocation();
- KerberosConfigurationBuilder.withKerberosConfiguration(configuration, kerberosConfigFileLocation);
-
- }
-
- List<String> namedNodes = endpointConfig.getNamedNodeList();
- if (!namedNodes.isEmpty()) {
- HaConfigurationBuilder.withClusterConfiguration(configuration, endpointConfig.getNamedNodeList(), endpointConfig.getReplication());
-
- }
-
- return configuration;
- }
-
- /**
- * this will connect to the hadoop hdfs file system, and in case of no connection
- * then the hardcoded timeout in hadoop is 45 x 20 sec = 15 minutes
- */
- private static FileSystem newFileSystem(Configuration configuration, String hdfsPath, HdfsConfiguration endpointConfig) throws IOException {
- if (endpointConfig.isKerberosAuthentication()) {
- String userName = endpointConfig.getKerberosUsername();
- String keytabLocation = endpointConfig.getKerberosKeytabLocation();
- new KerberosAuthentication(configuration, userName, keytabLocation).loginWithKeytab();
- }
-
- return FileSystem.get(URI.create(hdfsPath), configuration);
- }
-
}
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfoFactory.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfoFactory.java
index 4862a70..6bb47da 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfoFactory.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfoFactory.java
@@ -17,22 +17,77 @@
package org.apache.camel.component.hdfs;
import java.io.IOException;
+import java.net.URI;
-import javax.security.auth.login.Configuration;
+import org.apache.camel.component.hdfs.kerberos.KerberosAuthentication;
+import org.apache.camel.component.hdfs.kerberos.KerberosConfigurationBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
public final class HdfsInfoFactory {
private HdfsInfoFactory() {
+ // hidden
}
- public static HdfsInfo newHdfsInfo(String hdfsPath, HdfsConfiguration configuration) throws IOException {
+ static HdfsInfo newHdfsInfo(String hdfsPath, HdfsConfiguration endpointConfig) throws IOException {
// need to remember auth as Hadoop will override that, which otherwise means the Auth is broken afterwards
- Configuration auth = HdfsComponent.getJAASConfiguration();
+ javax.security.auth.login.Configuration auth = HdfsComponent.getJAASConfiguration();
try {
- return new HdfsInfo(hdfsPath, configuration);
+ return newHdfsInfoWithoutAuth(hdfsPath, endpointConfig);
} finally {
HdfsComponent.setJAASConfiguration(auth);
}
}
+ static HdfsInfo newHdfsInfoWithoutAuth(String hdfsPath, HdfsConfiguration endpointConfig) throws IOException {
+ Configuration configuration = newConfiguration(endpointConfig);
+
+ authenticate(configuration, endpointConfig);
+
+ FileSystem fileSystem = newFileSystem(configuration, hdfsPath, endpointConfig);
+ Path path = new Path(hdfsPath);
+
+ return new HdfsInfo(configuration, fileSystem, path);
+ }
+
+ static Configuration newConfiguration(HdfsConfiguration endpointConfig) {
+ Configuration configuration = new Configuration();
+
+ if (endpointConfig.isKerberosAuthentication()) {
+ KerberosConfigurationBuilder.withKerberosConfiguration(configuration, endpointConfig);
+ }
+
+ if (endpointConfig.hasClusterConfiguration()) {
+ HaConfigurationBuilder.withClusterConfiguration(configuration, endpointConfig);
+ }
+
+ return configuration;
+ }
+
+ static void authenticate(Configuration configuration, HdfsConfiguration endpointConfig) throws IOException {
+ if (endpointConfig.isKerberosAuthentication()) {
+ String userName = endpointConfig.getKerberosUsername();
+ String keytabLocation = endpointConfig.getKerberosKeytabLocation();
+ new KerberosAuthentication(configuration, userName, keytabLocation).loginWithKeytab();
+ }
+ }
+
+ /**
+ * this will connect to the hadoop hdfs file system, and in case of no connection
+ * then the hardcoded timeout in hadoop is 45 x 20 sec = 15 minutes
+ */
+ static FileSystem newFileSystem(Configuration configuration, String hdfsPath, HdfsConfiguration endpointConfig) throws IOException {
+ FileSystem fileSystem;
+ if (endpointConfig.hasClusterConfiguration()) {
+ // using default FS that was set during in the cluster configuration (@see org.apache.camel.component.hdfs.HaConfigurationBuilder)
+ fileSystem = FileSystem.get(configuration);
+ } else {
+ fileSystem = FileSystem.get(URI.create(hdfsPath), configuration);
+ }
+
+ return fileSystem;
+ }
+
}
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
index 40212e8..49e3ddc 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
@@ -18,7 +18,6 @@ package org.apache.camel.component.hdfs;
import java.io.Closeable;
import java.io.IOException;
-import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.fs.Path;
@@ -53,29 +52,27 @@ public class HdfsInputStream implements Closeable {
* @throws IOException
*/
public static HdfsInputStream createInputStream(String hdfsPath, HdfsConfiguration configuration) {
- HdfsInputStream ret = new HdfsInputStream();
- ret.fileType = configuration.getFileType();
- ret.actualPath = hdfsPath;
- ret.suffixedPath = ret.actualPath + '.' + configuration.getOpenedSuffix();
- ret.suffixedReadPath = ret.actualPath + '.' + configuration.getReadSuffix();
- ret.chunkSize = configuration.getChunkSize();
+ HdfsInputStream iStream = new HdfsInputStream();
+ iStream.fileType = configuration.getFileType();
+ iStream.actualPath = hdfsPath;
+ iStream.suffixedPath = iStream.actualPath + '.' + configuration.getOpenedSuffix();
+ iStream.suffixedReadPath = iStream.actualPath + '.' + configuration.getReadSuffix();
+ iStream.chunkSize = configuration.getChunkSize();
try {
- HdfsInfo info = HdfsInfoFactory.newHdfsInfo(ret.actualPath, configuration);
- if (info.getFileSystem().rename(new Path(ret.actualPath), new Path(ret.suffixedPath))) {
- ret.in = ret.fileType.createInputStream(ret.suffixedPath, configuration);
- ret.opened = true;
- ret.config = configuration;
+ HdfsInfo info = HdfsInfoFactory.newHdfsInfo(iStream.actualPath, configuration);
+ if (info.getFileSystem().rename(new Path(iStream.actualPath), new Path(iStream.suffixedPath))) {
+ iStream.in = iStream.fileType.createInputStream(iStream.suffixedPath, configuration);
+ iStream.opened = true;
+ iStream.config = configuration;
} else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Failed to open file [{}] because it doesn't exist", hdfsPath);
- }
- ret = null;
+ LOG.debug("Failed to open file [{}] because it doesn't exist", hdfsPath);
+ iStream = null;
}
} catch (IOException e) {
throw new RuntimeException(e);
}
- return ret;
+ return iStream;
}
@Override
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileType.java
index 01868b7..fbd3e04 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileType.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileType.java
@@ -72,7 +72,7 @@ class HdfsMapFileType extends DefaultHdfsFileType {
HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration);
Class<? extends WritableComparable> keyWritableClass = configuration.getKeyType().getWritableClass();
Class<? extends WritableComparable> valueWritableClass = configuration.getValueType().getWritableClass();
- rout = new MapFile.Writer(hdfsInfo.getConf(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass), MapFile.Writer.valueClass(valueWritableClass),
+ rout = new MapFile.Writer(hdfsInfo.getConfiguration(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass), MapFile.Writer.valueClass(valueWritableClass),
MapFile.Writer.compression(configuration.getCompressionType(), configuration.getCompressionCodec().getCodec()),
MapFile.Writer.progressable(() -> {
}));
@@ -87,7 +87,7 @@ class HdfsMapFileType extends DefaultHdfsFileType {
try {
Closeable rin;
HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration);
- rin = new MapFile.Reader(new Path(hdfsPath), hdfsInfo.getConf());
+ rin = new MapFile.Reader(new Path(hdfsPath), hdfsInfo.getConfiguration());
return rin;
} catch (IOException ex) {
throw new RuntimeCamelException(ex);
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOsgiHelper.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOsgiHelper.java
index 9d9a5cc..51efaf7 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOsgiHelper.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOsgiHelper.java
@@ -43,10 +43,10 @@ public class HdfsOsgiHelper {
Configuration conf = new Configuration();
// set that as the hdfs configuration's classloader
conf.setClassLoader(cl);
- for (String key : fileSystems.keySet()) {
- URI uri = URI.create(key);
- conf.setClass(String.format("fs.%s.impl", uri.getScheme()), cl.loadClass(fileSystems.get(key)), FileSystem.class);
- LOG.debug("Successfully loaded class: {}", fileSystems.get(key));
+ for (Map.Entry<String, String> fsEntry : fileSystems.entrySet()) {
+ URI uri = URI.create(fsEntry.getKey());
+ conf.setClass(String.format("fs.%s.impl", uri.getScheme()), cl.loadClass(fsEntry.getValue()), FileSystem.class);
+ LOG.debug("Successfully loaded class: {}", fsEntry.getValue());
FileSystem.get(uri, conf);
LOG.debug("Successfully got uri: {} from FileSystem Object", uri);
}
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java
index 6e623e3..49a3a00 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java
@@ -43,35 +43,35 @@ public class HdfsOutputStream implements Closeable {
}
public static HdfsOutputStream createOutputStream(String hdfsPath, HdfsConfiguration configuration) throws IOException {
- HdfsOutputStream ret = new HdfsOutputStream();
- ret.fileType = configuration.getFileType();
- ret.actualPath = hdfsPath;
- ret.info = new HdfsInfo(ret.actualPath, configuration);
+ HdfsOutputStream oStream = new HdfsOutputStream();
+ oStream.fileType = configuration.getFileType();
+ oStream.actualPath = hdfsPath;
+ oStream.info = HdfsInfoFactory.newHdfsInfoWithoutAuth(oStream.actualPath, configuration);
+
+ oStream.suffixedPath = oStream.actualPath + '.' + configuration.getOpenedSuffix();
+
+ Path actualPath = new Path(oStream.actualPath);
+ boolean actualPathExists = oStream.info.getFileSystem().exists(actualPath);
- ret.suffixedPath = ret.actualPath + '.' + configuration.getOpenedSuffix();
if (configuration.isWantAppend() || configuration.isAppend()) {
- if (!ret.info.getFileSystem().exists(new Path(ret.actualPath))) {
- configuration.setAppend(false);
- } else {
+ if (actualPathExists) {
configuration.setAppend(true);
- ret.info = new HdfsInfo(ret.suffixedPath, configuration);
- ret.info.getFileSystem().rename(new Path(ret.actualPath), new Path(ret.suffixedPath));
+ oStream.info = HdfsInfoFactory.newHdfsInfoWithoutAuth(oStream.suffixedPath, configuration);
+ oStream.info.getFileSystem().rename(actualPath, new Path(oStream.suffixedPath));
+ } else {
+ configuration.setAppend(false);
}
- } else {
- if (ret.info.getFileSystem().exists(new Path(ret.actualPath))) {
- //only check if not directory
- if (!ret.info.getFileSystem().isDirectory(new Path(ret.actualPath))) {
- if (configuration.isOverwrite()) {
- ret.info.getFileSystem().delete(new Path(ret.actualPath), true);
- } else {
- throw new RuntimeCamelException("The file already exists");
- }
- }
+ } else if (actualPathExists && !oStream.info.getFileSystem().isDirectory(actualPath)) { // only check if not directory
+ if (configuration.isOverwrite()) {
+ oStream.info.getFileSystem().delete(actualPath, true);
+ } else {
+ throw new RuntimeCamelException("The file already exists");
}
}
- ret.out = ret.fileType.createOutputStream(ret.suffixedPath, configuration);
- ret.opened = true;
- return ret;
+
+ oStream.out = oStream.fileType.createOutputStream(oStream.suffixedPath, configuration);
+ oStream.opened = true;
+ return oStream;
}
@Override
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
index 568ee71..048a120 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
@@ -30,18 +30,14 @@ import org.apache.camel.Expression;
import org.apache.camel.support.DefaultProducer;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.StringHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class HdfsProducer extends DefaultProducer {
- private static final Logger LOG = LoggerFactory.getLogger(HdfsProducer.class);
-
private final HdfsConfiguration config;
private final StringBuilder hdfsPath;
private final AtomicBoolean idle = new AtomicBoolean(false);
private volatile ScheduledExecutorService scheduler;
- private volatile HdfsOutputStream ostream;
+ private volatile HdfsOutputStream oStream;
public static final class SplitStrategy {
private SplitStrategyType type;
@@ -106,7 +102,7 @@ public class HdfsProducer extends DefaultProducer {
// setup hdfs if configured to do on startup
if (getEndpoint().getConfig().isConnectOnStartup()) {
- ostream = setupHdfs(true);
+ oStream = setupHdfs(true);
}
Optional<SplitStrategy> idleStrategy = tryFindIdleStrategy(config.getSplitStrategies());
@@ -116,8 +112,8 @@ public class HdfsProducer extends DefaultProducer {
scheduler.scheduleAtFixedRate(new IdleCheck(idleStrategy.get()), config.getCheckIdleInterval(), config.getCheckIdleInterval(), TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
- LOG.warn("Failed to start the HDFS producer. Caused by: [{}]", e.getMessage());
- LOG.debug("", e);
+ log.warn("Failed to start the HDFS producer. Caused by: [{}]", e.getMessage());
+ log.debug("", e);
throw new RuntimeException(e);
} finally {
HdfsComponent.setJAASConfiguration(auth);
@@ -125,8 +121,8 @@ public class HdfsProducer extends DefaultProducer {
}
private synchronized HdfsOutputStream setupHdfs(boolean onStartup) throws IOException {
- if (ostream != null) {
- return ostream;
+ if (oStream != null) {
+ return oStream;
}
StringBuilder actualPath = new StringBuilder(hdfsPath);
@@ -134,23 +130,21 @@ public class HdfsProducer extends DefaultProducer {
actualPath = newFileName();
}
+ String hdfsFsDescription = config.getFileSystemLabel(actualPath.toString());
+
// if we are starting up then log at info level, and if runtime then log at debug level to not flood the log
if (onStartup) {
- log.info("Connecting to hdfs file-system {}:{}/{} (may take a while if connection is not available)", config.getHostName(), config.getPort(), actualPath);
+ log.info("Connecting to hdfs file-system {} (may take a while if connection is not available)", hdfsFsDescription);
} else {
- if (log.isDebugEnabled()) {
- log.debug("Connecting to hdfs file-system {}:{}/{} (may take a while if connection is not available)", config.getHostName(), config.getPort(), actualPath);
- }
+ log.debug("Connecting to hdfs file-system {} (may take a while if connection is not available)", hdfsFsDescription);
}
HdfsOutputStream answer = HdfsOutputStream.createOutputStream(actualPath.toString(), config);
if (onStartup) {
- log.info("Connected to hdfs file-system {}:{}/{}", config.getHostName(), config.getPort(), actualPath);
+ log.info("Connected to hdfs file-system {}", hdfsFsDescription);
} else {
- if (log.isDebugEnabled()) {
- log.debug("Connected to hdfs file-system {}:{}/{}", config.getHostName(), config.getPort(), actualPath);
- }
+ log.debug("Connected to hdfs file-system {}", hdfsFsDescription);
}
return answer;
@@ -172,9 +166,9 @@ public class HdfsProducer extends DefaultProducer {
getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(scheduler);
scheduler = null;
}
- if (ostream != null) {
- IOHelper.close(ostream, "output stream", log);
- ostream = null;
+ if (oStream != null) {
+ IOHelper.close(oStream, "output stream", log);
+ oStream = null;
}
}
@@ -195,27 +189,27 @@ public class HdfsProducer extends DefaultProducer {
// if an explicit filename is specified, close any existing stream and append the filename to the hdfsPath
if (exchange.getIn().getHeader(Exchange.FILE_NAME) != null) {
- if (ostream != null) {
- IOHelper.close(ostream, "output stream", log);
+ if (oStream != null) {
+ IOHelper.close(oStream, "output stream", log);
}
StringBuilder actualPath = getHdfsPathUsingFileNameHeader(exchange);
- ostream = HdfsOutputStream.createOutputStream(actualPath.toString(), config);
- } else if (ostream == null) {
- // must have ostream
- ostream = setupHdfs(false);
+ oStream = HdfsOutputStream.createOutputStream(actualPath.toString(), config);
+ } else if (oStream == null) {
+ // must have oStream
+ oStream = setupHdfs(false);
}
if (isSplitRequired(config.getSplitStrategies())) {
- if (ostream != null) {
- IOHelper.close(ostream, "output stream", log);
+ if (oStream != null) {
+ IOHelper.close(oStream, "output stream", log);
}
StringBuilder actualPath = newFileName();
- ostream = HdfsOutputStream.createOutputStream(actualPath.toString(), config);
+ oStream = HdfsOutputStream.createOutputStream(actualPath.toString(), config);
}
- String path = ostream.getActualPath();
+ String path = oStream.getActualPath();
log.trace("Writing body to hdfs-file {}", path);
- ostream.append(key, body, exchange.getContext().getTypeConverter());
+ oStream.append(key, body, exchange.getContext().getTypeConverter());
idle.set(false);
@@ -231,8 +225,8 @@ public class HdfsProducer extends DefaultProducer {
if (close) {
try {
HdfsProducer.this.log.trace("Closing stream");
- ostream.close();
- ostream = null;
+ oStream.close();
+ oStream = null;
} catch (IOException e) {
// ignore
}
@@ -243,6 +237,7 @@ public class HdfsProducer extends DefaultProducer {
/**
* helper method to construct the hdfsPath from the CamelFileName String or Expression
+ *
* @param exchange
* @return
*/
@@ -253,7 +248,7 @@ public class HdfsProducer extends DefaultProducer {
if (value instanceof String) {
fileName = exchange.getContext().getTypeConverter().convertTo(String.class, exchange, value);
} else if (value instanceof Expression) {
- fileName = ((Expression) value).evaluate(exchange, String.class);
+ fileName = ((Expression) value).evaluate(exchange, String.class);
}
return actualPath.append(fileName);
}
@@ -261,7 +256,7 @@ public class HdfsProducer extends DefaultProducer {
private boolean isSplitRequired(List<SplitStrategy> strategies) {
boolean split = false;
for (SplitStrategy splitStrategy : strategies) {
- split |= splitStrategy.getType().split(ostream, splitStrategy.value, this);
+ split |= splitStrategy.getType().split(oStream, splitStrategy.value, this);
}
return split;
}
@@ -285,18 +280,18 @@ public class HdfsProducer extends DefaultProducer {
@Override
public void run() {
- // only run if ostream has been created
- if (ostream == null) {
+ // only run if oStream has been created
+ if (oStream == null) {
return;
}
HdfsProducer.this.log.trace("IdleCheck running");
- if (System.currentTimeMillis() - ostream.getLastAccess() > strategy.value && !idle.get() && !ostream.isBusy().get()) {
+ if (System.currentTimeMillis() - oStream.getLastAccess() > strategy.value && !idle.get() && !oStream.isBusy().get()) {
idle.set(true);
try {
HdfsProducer.this.log.trace("Closing stream as idle");
- ostream.close();
+ oStream.close();
} catch (IOException e) {
// ignore
}
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileType.java
index 0bfde2e..159183a 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileType.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileType.java
@@ -71,7 +71,7 @@ class HdfsSequenceFileType extends DefaultHdfsFileType {
HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration);
Class<?> keyWritableClass = configuration.getKeyType().getWritableClass();
Class<?> valueWritableClass = configuration.getValueType().getWritableClass();
- rout = SequenceFile.createWriter(hdfsInfo.getConf(), SequenceFile.Writer.file(hdfsInfo.getPath()), SequenceFile.Writer.keyClass(keyWritableClass),
+ rout = SequenceFile.createWriter(hdfsInfo.getConfiguration(), SequenceFile.Writer.file(hdfsInfo.getPath()), SequenceFile.Writer.keyClass(keyWritableClass),
SequenceFile.Writer.valueClass(valueWritableClass), SequenceFile.Writer.bufferSize(configuration.getBufferSize()),
SequenceFile.Writer.replication(configuration.getReplication()), SequenceFile.Writer.blockSize(configuration.getBlockSize()),
SequenceFile.Writer.compression(configuration.getCompressionType(), configuration.getCompressionCodec().getCodec()),
@@ -88,7 +88,7 @@ class HdfsSequenceFileType extends DefaultHdfsFileType {
try {
Closeable rin;
HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration);
- rin = new SequenceFile.Reader(hdfsInfo.getConf(), SequenceFile.Reader.file(hdfsInfo.getPath()));
+ rin = new SequenceFile.Reader(hdfsInfo.getConfiguration(), SequenceFile.Reader.file(hdfsInfo.getPath()));
return rin;
} catch (IOException ex) {
throw new RuntimeCamelException(ex);
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/kerberos/KerberosConfigurationBuilder.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/kerberos/KerberosConfigurationBuilder.java
index b5025cc..c753c46 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/kerberos/KerberosConfigurationBuilder.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/kerberos/KerberosConfigurationBuilder.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.hdfs.kerberos;
import java.io.File;
+import org.apache.camel.component.hdfs.HdfsConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,10 +37,10 @@ public final class KerberosConfigurationBuilder {
/**
* Add all the kerberos specific settings needed for this authentication mode
*
- * @param kerberosConfigFileLocation - The location of the kerberos config file (on the server)
+ * @param endpointConfig - configuration with the HA settings configured on the endpoint
*/
- public static void withKerberosConfiguration(Configuration configuration, String kerberosConfigFileLocation) {
- setKerberosConfigFile(kerberosConfigFileLocation);
+ public static void withKerberosConfiguration(Configuration configuration, HdfsConfiguration endpointConfig) {
+ setKerberosConfigFile(endpointConfig.getKerberosConfigFileLocation());
configuration.set(AUTHENTICATION_MODE, "kerberos");
}
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/FromFileToHdfsTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/FromFileToHdfsTest.java
index a134bce..fdf8d99 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/FromFileToHdfsTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/FromFileToHdfsTest.java
@@ -37,7 +37,7 @@ public class FromFileToHdfsTest extends HdfsTestSupport {
@Override
@Before
public void setUp() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
deleteDirectory("target/inbox");
@@ -48,7 +48,7 @@ public class FromFileToHdfsTest extends HdfsTestSupport {
@Override
@After
public void tearDown() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
@@ -61,7 +61,7 @@ public class FromFileToHdfsTest extends HdfsTestSupport {
@Test
public void testFileToHdfs() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
@@ -80,7 +80,7 @@ public class FromFileToHdfsTest extends HdfsTestSupport {
@Test
public void testTwoFilesToHdfs() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HaConfigurationBuilderTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HaConfigurationBuilderTest.java
index 3cf20fc..91dffe7 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HaConfigurationBuilderTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HaConfigurationBuilderTest.java
@@ -33,21 +33,48 @@ public class HaConfigurationBuilderTest {
public void withClusterConfiguration() {
// given
Configuration configuration = new Configuration();
+ String haClusterName = "haCluster";
List<String> namedNodes = Arrays.asList("kerb_node_01.example.com:8021", "kerb_node_02.example.com:8022");
int replicationFactor = 3;
// when
- HaConfigurationBuilder.withClusterConfiguration(configuration, namedNodes, replicationFactor);
+ HaConfigurationBuilder.withClusterConfiguration(configuration, haClusterName, namedNodes, replicationFactor);
// then
assertThat(configuration, notNullValue());
assertThat(configuration.get(DFSConfigKeys.DFS_REPLICATION_KEY), is("3"));
- assertThat(configuration.get(DFSConfigKeys.DFS_NAMESERVICES), is("hfdsNamedService"));
- assertThat(configuration.get("dfs.ha.namenodes.hfdsNamedService"), is("kerb_node_01_example_com,kerb_node_02_example_com"));
- assertThat(configuration.get("dfs.namenode.rpc-address.hfdsNamedService.kerb_node_01_example_com"), is("kerb_node_01.example.com:8021"));
- assertThat(configuration.get("dfs.namenode.rpc-address.hfdsNamedService.kerb_node_02_example_com"), is("kerb_node_02.example.com:8022"));
- assertThat(configuration.get("dfs.client.failover.proxy.provider.hfdsNamedService"), is("org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"));
- assertThat(configuration.get("fs.defaultFS"), is("hdfs://hfdsNamedService"));
+ assertThat(configuration.get(DFSConfigKeys.DFS_NAMESERVICES), is("haCluster"));
+ assertThat(configuration.get("dfs.ha.namenodes.haCluster"), is("kerb_node_01_example_com,kerb_node_02_example_com"));
+ assertThat(configuration.get("dfs.namenode.rpc-address.haCluster.kerb_node_01_example_com"), is("kerb_node_01.example.com:8021"));
+ assertThat(configuration.get("dfs.namenode.rpc-address.haCluster.kerb_node_02_example_com"), is("kerb_node_02.example.com:8022"));
+ assertThat(configuration.get("dfs.client.failover.proxy.provider.haCluster"), is("org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"));
+ assertThat(configuration.get("fs.defaultFS"), is("hdfs://haCluster"));
+ }
+
+ @Test
+ public void getSanitizedClusterNameWithNull() {
+ // given
+ String haClusterName = null;
+
+ // when
+ String actual = HaConfigurationBuilder.getSanitizedClusterName(haClusterName);
+
+ // then
+ assertThat(actual, notNullValue());
+ assertThat(actual, is("hfdsNamedService"));
+ }
+
+ @Test
+ public void getSanitizedClusterNameWithHostName() {
+ // given
+ String haClusterName = "this.is.a.cluster.host";
+
+ // when
+ String actual = HaConfigurationBuilder.getSanitizedClusterName(haClusterName);
+
+ // then
+ assertThat(actual, notNullValue());
+ assertThat(actual, is("this_is_a_cluster_host"));
}
}
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
index 0845fbe..f0ec9de 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
@@ -67,7 +67,7 @@ public class HdfsConsumerTest extends HdfsTestSupport {
@Override
@Before
public void setUp() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
@@ -84,7 +84,7 @@ public class HdfsConsumerTest extends HdfsTestSupport {
@Test
public void testSimpleConsumer() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
@@ -113,11 +113,11 @@ public class HdfsConsumerTest extends HdfsTestSupport {
@Test
public void testConcurrentConsumers() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
- final File rootdir = new File(".");
+ final File rootdir = CWD;
final File dir = new File("target/test/multiple-consumers");
dir.mkdirs();
for (int i = 1; i <= ITERATIONS; i++) {
@@ -157,7 +157,7 @@ public class HdfsConsumerTest extends HdfsTestSupport {
@Test
public void testSimpleConsumerWithEmptyFile() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
@@ -186,7 +186,7 @@ public class HdfsConsumerTest extends HdfsTestSupport {
@Test
public void testSimpleConsumerFileWithSizeEqualToNChunks() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
@@ -217,7 +217,7 @@ public class HdfsConsumerTest extends HdfsTestSupport {
@Test
public void testSimpleConsumerWithEmptySequenceFile() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
@@ -242,7 +242,7 @@ public class HdfsConsumerTest extends HdfsTestSupport {
@Test
public void testReadWithReadSuffix() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
@@ -287,7 +287,7 @@ public class HdfsConsumerTest extends HdfsTestSupport {
@Test
public void testReadBoolean() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
@@ -315,7 +315,7 @@ public class HdfsConsumerTest extends HdfsTestSupport {
@Test
public void testReadByte() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
@@ -346,7 +346,7 @@ public class HdfsConsumerTest extends HdfsTestSupport {
@Test
public void testReadFloat() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
@@ -376,7 +376,7 @@ public class HdfsConsumerTest extends HdfsTestSupport {
@Test
public void testReadDouble() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
@@ -406,7 +406,7 @@ public class HdfsConsumerTest extends HdfsTestSupport {
@Test
public void testReadInt() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
@@ -436,7 +436,7 @@ public class HdfsConsumerTest extends HdfsTestSupport {
@Test
public void testReadLong() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
@@ -466,7 +466,7 @@ public class HdfsConsumerTest extends HdfsTestSupport {
@Test
public void testReadBytes() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
@@ -496,7 +496,7 @@ public class HdfsConsumerTest extends HdfsTestSupport {
@Test
public void testReadString() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
@@ -526,7 +526,7 @@ public class HdfsConsumerTest extends HdfsTestSupport {
@Test
public void testReadStringArrayFile() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
@@ -560,7 +560,7 @@ public class HdfsConsumerTest extends HdfsTestSupport {
@Override
@After
public void tearDown() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfoFactory.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInfoTest.java
similarity index 52%
copy from components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfoFactory.java
copy to components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInfoTest.java
index 4862a70..eff5fa9 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfoFactory.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInfoTest.java
@@ -18,21 +18,29 @@ package org.apache.camel.component.hdfs;
import java.io.IOException;
-import javax.security.auth.login.Configuration;
+import org.junit.Test;
-public final class HdfsInfoFactory {
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
- private HdfsInfoFactory() {
- }
+public class HdfsInfoTest {
- public static HdfsInfo newHdfsInfo(String hdfsPath, HdfsConfiguration configuration) throws IOException {
- // need to remember auth as Hadoop will override that, which otherwise means the Auth is broken afterwards
- Configuration auth = HdfsComponent.getJAASConfiguration();
- try {
- return new HdfsInfo(hdfsPath, configuration);
- } finally {
- HdfsComponent.setJAASConfiguration(auth);
- }
- }
+ private HdfsInfo underTest;
+
+ @Test
+ public void createHdfsInfo() throws IOException {
+ // given
+ String hdfsPath = "hdfs://localhost/target/test/multiple-consumers";
+ HdfsConfiguration endpointConfig = mock(HdfsConfiguration.class);
-}
+ // when
+ underTest = HdfsInfoFactory.newHdfsInfoWithoutAuth(hdfsPath, endpointConfig);
+
+ // then
+ assertThat(underTest, notNullValue());
+ assertThat(underTest.getConfiguration(), notNullValue());
+ assertThat(underTest.getFileSystem(), notNullValue());
+ assertThat(underTest.getPath(), notNullValue());
+ }
+}
\ No newline at end of file
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerConsumerTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerConsumerTest.java
index 0f71bdd..7e98793 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerConsumerTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerConsumerTest.java
@@ -34,7 +34,7 @@ public class HdfsProducerConsumerTest extends HdfsTestSupport {
@Override
@Before
public void setUp() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
super.setUp();
@@ -47,7 +47,7 @@ public class HdfsProducerConsumerTest extends HdfsTestSupport {
@Test
public void testSimpleSplitWriteRead() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
@@ -81,7 +81,7 @@ public class HdfsProducerConsumerTest extends HdfsTestSupport {
@Override
@After
public void tearDown() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerSplitTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerSplitTest.java
index b3c2e85..60976f5 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerSplitTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerSplitTest.java
@@ -35,7 +35,7 @@ public class HdfsProducerSplitTest extends HdfsTestSupport {
@Override
@Before
public void setUp() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
super.setUp();
@@ -53,7 +53,7 @@ public class HdfsProducerSplitTest extends HdfsTestSupport {
@Test
public void testSimpleWriteFileWithIdleSplit() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
@@ -86,7 +86,7 @@ public class HdfsProducerSplitTest extends HdfsTestSupport {
}
private void doTest(int routeNr) throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
@@ -108,7 +108,7 @@ public class HdfsProducerSplitTest extends HdfsTestSupport {
@Override
@After
public void tearDown() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
index f4cdfc0..a97f29b 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
@@ -53,7 +53,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
@Override
@Before
public void setUp() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
super.setUp();
@@ -61,7 +61,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
@Test
public void testProducer() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
template.sendBody("direct:start1", "PAPPO");
@@ -79,7 +79,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
@Test
public void testProducerClose() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
for (int i = 0; i < 10; ++i) {
@@ -105,7 +105,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
@Test
public void testWriteBoolean() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
Boolean aBoolean = true;
@@ -125,7 +125,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
@Test
public void testWriteByte() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
byte aByte = 8;
@@ -145,7 +145,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
@Test
public void testWriteInt() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
int anInt = 1234;
@@ -165,7 +165,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
@Test
public void testWriteFloat() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
float aFloat = 12.34f;
@@ -185,7 +185,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
@Test
public void testWriteDouble() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
Double aDouble = 12.34D;
@@ -205,7 +205,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
@Test
public void testWriteLong() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
long aLong = 1234567890;
@@ -225,7 +225,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
@Test
public void testWriteText() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
String txt = "CIAO MONDO !";
@@ -245,7 +245,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
@Test
public void testWriteTextWithKey() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
String txtKey = "THEKEY";
@@ -266,7 +266,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
@Test
public void testMapWriteTextWithKey() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
String txtKey = "THEKEY";
@@ -286,7 +286,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
@Test
public void testArrayWriteText() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
String txtValue = "CIAO MONDO !";
@@ -305,7 +305,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
@Test
public void testBloomMapWriteText() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
String txtKey = "THEKEY";
@@ -325,7 +325,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
@Test
public void testWriteTextWithDynamicFilename() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
@@ -348,7 +348,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
@Test
public void testWriteTextWithDynamicFilenameExpression() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
@@ -372,7 +372,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
@Override
@After
public void tearDown() throws Exception {
- if (!canTest()) {
+ if (skipTest()) {
return;
}
super.tearDown();
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsTestSupport.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsTestSupport.java
index 6399fc0..e406014 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsTestSupport.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsTestSupport.java
@@ -16,25 +16,55 @@
*/
package org.apache.camel.component.hdfs;
+import java.io.File;
+import java.util.Objects;
+
import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.util.Shell;
public abstract class HdfsTestSupport extends CamelTestSupport {
- public boolean canTest() {
+ public static final File CWD = new File(".");
+
+ private static Boolean skipTests;
+
+ public boolean skipTest() {
+ if (Objects.isNull(skipTests)) {
+ skipTests = notConfiguredToRunTests();
+ }
+
+ return skipTests;
+ }
+
+ private boolean notConfiguredToRunTests() {
+ return isJavaFromIbm() || missingLocalHadoopConfiguration() || missingAuthenticationConfiguration();
+ }
+
+ private static boolean isJavaFromIbm() {
// Hadoop doesn't run on IBM JDK
- if (System.getProperty("java.vendor").contains("IBM")) {
- return false;
+ return System.getProperty("java.vendor").contains("IBM");
+ }
+
+ private static boolean missingLocalHadoopConfiguration() {
+ boolean hasLocalHadoop;
+ try {
+ String hadoopHome = Shell.getHadoopHome();
+ hasLocalHadoop = StringUtils.isNotEmpty(hadoopHome);
+ } catch (Throwable e) {
+ hasLocalHadoop = false;
}
+ return !hasLocalHadoop;
+ }
- // must be able to get security configuration
+ private boolean missingAuthenticationConfiguration() {
try {
javax.security.auth.login.Configuration.getConfiguration();
+ return false;
} catch (Exception e) {
log.debug("Cannot run test due security exception", e);
- return false;
+ return true;
}
-
- return true;
}
}
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/kerberos/KerberosAuthenticationTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/kerberos/KerberosAuthenticationTest.java
index ca94682..383a45d 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/kerberos/KerberosAuthenticationTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/kerberos/KerberosAuthenticationTest.java
@@ -16,13 +16,14 @@
*/
package org.apache.camel.component.hdfs.kerberos;
-import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.junit.Test;
+import static org.apache.camel.component.hdfs.HdfsTestSupport.CWD;
+
public class KerberosAuthenticationTest {
private KerberosAuthentication underTest;
@@ -33,7 +34,7 @@ public class KerberosAuthenticationTest {
Configuration configuration = new Configuration();
String username = "test_user";
- String keyTabFileLocation = pwd() + "/src/test/resources/kerberos/test-keytab.bin";
+ String keyTabFileLocation = CWD.getAbsolutePath() + "/src/test/resources/kerberos/test-keytab.bin";
underTest = new KerberosAuthentication(configuration, username, keyTabFileLocation);
@@ -50,7 +51,7 @@ public class KerberosAuthenticationTest {
Configuration configuration = new Configuration();
String username = "test_user";
- String keyTabFileLocation = pwd() + "/src/test/resources/kerberos/missing.bin";
+ String keyTabFileLocation = CWD.getAbsolutePath() + "/src/test/resources/kerberos/missing.bin";
underTest = new KerberosAuthentication(configuration, username, keyTabFileLocation);
@@ -61,8 +62,4 @@ public class KerberosAuthenticationTest {
/* exception was thrown */
}
- private String pwd() {
- return new File(".").getAbsolutePath();
- }
-
}
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/kerberos/KerberosConfigurationBuilderTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/kerberos/KerberosConfigurationBuilderTest.java
index aa00a82..af75002 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/kerberos/KerberosConfigurationBuilderTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/kerberos/KerberosConfigurationBuilderTest.java
@@ -16,13 +16,9 @@
*/
package org.apache.camel.component.hdfs.kerberos;
-import java.io.File;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
import org.junit.Test;
+import static org.apache.camel.component.hdfs.HdfsTestSupport.CWD;
import static org.junit.Assert.*;
public class KerberosConfigurationBuilderTest {
@@ -30,7 +26,7 @@ public class KerberosConfigurationBuilderTest {
@Test
public void withKerberosConfiguration() {
// given
- String kerberosConfigFileLocation = pwd() + "/src/test/resources/kerberos/test-kerb5.conf";
+ String kerberosConfigFileLocation = CWD.getAbsolutePath() + "/src/test/resources/kerberos/test-kerb5.conf";
// when
KerberosConfigurationBuilder.setKerberosConfigFile(kerberosConfigFileLocation);
@@ -43,7 +39,7 @@ public class KerberosConfigurationBuilderTest {
public void setKerberosConfigFileWithRealFile() {
// given
String kerb5FileName = "test-kerb5.conf";
- String kerberosConfigFileLocation = pwd() + "/src/test/resources/kerberos/" + kerb5FileName;
+ String kerberosConfigFileLocation = CWD.getAbsolutePath() + "/src/test/resources/kerberos/" + kerb5FileName;
// when
KerberosConfigurationBuilder.setKerberosConfigFile(kerberosConfigFileLocation);
@@ -58,7 +54,7 @@ public class KerberosConfigurationBuilderTest {
public void setKerberosConfigFileWithMissingFile() {
// given
String kerb5FileName = "missing-kerb5.conf";
- String kerberosConfigFileLocation = pwd() + "/src/test/resources/kerberos/" + kerb5FileName;
+ String kerberosConfigFileLocation = CWD.getAbsolutePath() + "/src/test/resources/kerberos/" + kerb5FileName;
// when
KerberosConfigurationBuilder.setKerberosConfigFile(kerberosConfigFileLocation);
@@ -68,8 +64,4 @@ public class KerberosConfigurationBuilderTest {
assertNull(actual);
}
- private String pwd() {
- return new File(".").getAbsolutePath();
- }
-
}