You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/07/14 23:17:25 UTC
[07/13] incubator-nifi git commit: NIFI-589: Updated with first
review feedback
NIFI-589: Updated with first review feedback
* Switched to using `getResourceAsStream()` where possible
* Removed trailing whitespace from added files
* Added missing license headers
* Added RAT exception to testdata files
* Fixed POM errors that broke the build
* Switched to using TemporaryFolder instead of putting files in
`target`
* Used try-with-resources where needed to autoclose streams
* Moved logging configuration to properties files
* Removed AbstractFlumeTest
* Fixed logging levels in test code
Signed-off-by: Matt Gilman <ma...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/3529bb33
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/3529bb33
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/3529bb33
Branch: refs/heads/develop
Commit: 3529bb33178ce70a33cc7ab21d7334ba3411048b
Parents: 3b9e482
Author: Joey Echeverria <jo...@gmail.com>
Authored: Fri Jun 5 14:00:37 2015 -0700
Committer: Matt Gilman <ma...@gmail.com>
Committed: Tue Jul 14 14:50:16 2015 -0400
----------------------------------------------------------------------
.../org/apache/nifi/util/file/FileUtils.java | 27 +++-
.../nifi-flume-processors/pom.xml | 127 +++++++++++--------
.../nifi/processors/flume/NifiChannel.java | 18 ++-
.../processors/flume/NifiChannelSelector.java | 18 ++-
.../nifi/processors/flume/NifiTransaction.java | 17 ++-
.../processors/flume/util/FlowFileEvent.java | 21 ++-
.../flume/util/FlowFileEventConstants.java | 17 ++-
.../processors/flume/AbstractFlumeTest.java | 35 -----
.../flume/FlumeSinkProcessorTest.java | 59 +++++----
.../flume/FlumeSourceProcessorTest.java | 25 ++--
.../src/test/resources/core-site.xml | 5 +
.../src/test/resources/log4j.properties | 20 +++
.../src/test/resources/simplelogger.properties | 20 +++
nifi/pom.xml | 1 +
14 files changed, 261 insertions(+), 149 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3529bb33/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java
index 7661e2d..ff4da8e 100644
--- a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java
+++ b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/util/file/FileUtils.java
@@ -560,6 +560,19 @@ public class FileUtils {
* @throws IOException if the MD5 hash could not be computed
*/
public static byte[] computeMd5Digest(final File file) throws IOException {
+ try (final FileInputStream fis = new FileInputStream(file)) {
+ return computeMd5Digest(fis);
+ }
+ }
+
+ /**
+ * Returns the MD5 hash of the given stream.
+ *
+ * @param stream an input stream
+ * @return the MD5 hash
+ * @throws IOException if the MD5 hash could not be computed
+ */
+ public static byte[] computeMd5Digest(final InputStream stream) throws IOException {
final MessageDigest digest;
try {
digest = MessageDigest.getInstance("MD5");
@@ -567,15 +580,15 @@ public class FileUtils {
throw new IOException(nsae);
}
- try (final FileInputStream fis = new FileInputStream(file)) {
- int len;
- final byte[] buffer = new byte[8192];
- while ((len = fis.read(buffer)) > -1) {
- if (len > 0) {
- digest.update(buffer, 0, len);
- }
+
+ int len;
+ final byte[] buffer = new byte[8192];
+ while ((len = stream.read(buffer)) > -1) {
+ if (len > 0) {
+ digest.update(buffer, 0, len);
}
}
+
return digest.digest();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3529bb33/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml
index bd26a99..b903f21 100644
--- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml
+++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/pom.xml
@@ -38,14 +38,14 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-flowfile-packager</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-sdk</artifactId>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-sdk</artifactId>
<version>1.5.2</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flume</groupId>
- <artifactId>flume-ng-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flume</groupId>
+ <artifactId>flume-ng-core</artifactId>
<version>1.5.2</version>
<exclusions>
<exclusion>
@@ -53,74 +53,89 @@
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
- </dependency>
+ </dependency>
<!-- Flume Sources -->
- <dependency>
- <groupId>org.apache.flume.flume-ng-sources</groupId>
- <artifactId>flume-twitter-source</artifactId>
+ <dependency>
+ <groupId>org.apache.flume.flume-ng-sources</groupId>
+ <artifactId>flume-twitter-source</artifactId>
<version>1.5.2</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flume.flume-ng-sources</groupId>
- <artifactId>flume-jms-source</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flume.flume-ng-sources</groupId>
+ <artifactId>flume-jms-source</artifactId>
<version>1.5.2</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flume.flume-ng-sources</groupId>
- <artifactId>flume-scribe-source</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flume.flume-ng-sources</groupId>
+ <artifactId>flume-scribe-source</artifactId>
<version>1.5.2</version>
- </dependency>
+ </dependency>
<!-- Flume Sinks -->
- <dependency>
- <groupId>org.apache.flume.flume-ng-sinks</groupId>
- <artifactId>flume-hdfs-sink</artifactId>
+ <dependency>
+ <groupId>org.apache.flume.flume-ng-sinks</groupId>
+ <artifactId>flume-hdfs-sink</artifactId>
<version>1.5.2</version>
- </dependency>
+ </dependency>
<!-- HDFS sink dependencies -->
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flume.flume-ng-sinks</groupId>
- <artifactId>flume-irc-sink</artifactId>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flume.flume-ng-sinks</groupId>
+ <artifactId>flume-irc-sink</artifactId>
<version>1.5.2</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flume.flume-ng-sinks</groupId>
- <artifactId>flume-ng-elasticsearch-sink</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flume.flume-ng-sinks</groupId>
+ <artifactId>flume-ng-elasticsearch-sink</artifactId>
<version>1.5.2</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flume.flume-ng-sinks</groupId>
- <artifactId>flume-ng-hbase-sink</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flume.flume-ng-sinks</groupId>
+ <artifactId>flume-ng-hbase-sink</artifactId>
<version>1.5.2</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flume.flume-ng-sinks</groupId>
- <artifactId>flume-ng-morphline-solr-sink</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flume.flume-ng-sinks</groupId>
+ <artifactId>flume-ng-morphline-solr-sink</artifactId>
<version>1.5.2</version>
- </dependency>
+ </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-simple</artifactId>
- <scope>test</scope>
- </dependency>
</dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>src/test/resources/testdata/*</exclude> <!-- test data -->
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3529bb33/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannel.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannel.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannel.java
index ac8dbe2..c4d3bef 100644
--- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannel.java
+++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannel.java
@@ -1,4 +1,19 @@
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.nifi.processors.flume;
import org.apache.flume.Context;
@@ -24,7 +39,6 @@ public class NifiChannel extends BasicChannelSemantics {
@Override
public void configure(Context context) {
- throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3529bb33/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannelSelector.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannelSelector.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannelSelector.java
index 792678b..2b0ba77 100644
--- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannelSelector.java
+++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiChannelSelector.java
@@ -1,4 +1,19 @@
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.nifi.processors.flume;
import com.google.common.collect.ImmutableList;
@@ -36,7 +51,6 @@ public class NifiChannelSelector implements ChannelSelector {
@Override
public void setChannels(List<Channel> channels) {
- throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3529bb33/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiTransaction.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiTransaction.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiTransaction.java
index 3d6a647..37c8a50 100644
--- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiTransaction.java
+++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/NifiTransaction.java
@@ -1,4 +1,19 @@
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.nifi.processors.flume;
import org.apache.flume.Event;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3529bb33/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEvent.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEvent.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEvent.java
index c3531ca..5dc97d6 100644
--- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEvent.java
+++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEvent.java
@@ -1,4 +1,19 @@
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.nifi.processors.flume.util;
import com.google.common.collect.Maps;
@@ -56,7 +71,7 @@ public class FlowFileEvent implements Event {
}
headers.put(LINEAGE_START_DATE_HEADER, Long.toString(flowFile.getLineageStartDate()));
headers.put(SIZE_HEADER, Long.toString(flowFile.getSize()));
-
+
headersLoaded = true;
}
}
@@ -83,7 +98,7 @@ public class FlowFileEvent implements Event {
if (flowFile.getSize() > Integer.MAX_VALUE) {
throw new RuntimeException("Can't get body of Event because the backing FlowFile is too large (" + flowFile.getSize() + " bytes)");
}
-
+
final ByteArrayOutputStream baos = new ByteArrayOutputStream((int) flowFile.getSize());
session.read(flowFile, new InputStreamCallback() {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3529bb33/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEventConstants.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEventConstants.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEventConstants.java
index c13f0ef..c9650c1 100644
--- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEventConstants.java
+++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/main/java/org/apache/nifi/processors/flume/util/FlowFileEventConstants.java
@@ -1,4 +1,19 @@
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.nifi.processors.flume.util;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3529bb33/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/AbstractFlumeTest.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/AbstractFlumeTest.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/AbstractFlumeTest.java
deleted file mode 100644
index 87b056a..0000000
--- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/AbstractFlumeTest.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.flume;
-
-import org.junit.BeforeClass;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AbstractFlumeTest {
-
- private static Logger logger;
-
- @BeforeClass
- public static void setUpClass() {
- System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info");
- System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
- System.setProperty("org.slf4j.simpleLogger.log.nifi.processors.flume", "debug");
- logger = LoggerFactory.getLogger(AbstractFlumeTest.class);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3529bb33/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java
index d22514f..2e10c24 100644
--- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java
+++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSinkProcessorTest.java
@@ -19,9 +19,9 @@ package org.apache.nifi.processors.flume;
import java.io.File;
import static org.junit.Assert.assertEquals;
-import java.io.FileInputStream;
import java.io.FilenameFilter;
import java.io.IOException;
+import java.io.InputStream;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -39,15 +39,20 @@ import org.apache.nifi.util.TestRunners;
import org.apache.nifi.util.file.FileUtils;
import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FlumeSinkProcessorTest {
- private static final Logger logger =
- LoggerFactory.getLogger(FlumeSinkProcessorTest.class);
-
+ private static final Logger logger =
+ LoggerFactory.getLogger(FlumeSinkProcessorTest.class);
+
+ @Rule
+ public final TemporaryFolder temp = new TemporaryFolder();
+
@Test
public void testValidators() {
TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
@@ -62,7 +67,7 @@ public class FlumeSinkProcessorTest {
}
Assert.assertEquals(1, results.size());
for (ValidationResult vr : results) {
- logger.error(vr.toString());
+ logger.debug(vr.toString());
Assert.assertTrue(vr.toString().contains("is invalid because Sink Type is required"));
}
@@ -76,7 +81,7 @@ public class FlumeSinkProcessorTest {
}
Assert.assertEquals(1, results.size());
for (ValidationResult vr : results) {
- logger.error(vr.toString());
+ logger.debug(vr.toString());
Assert.assertTrue(vr.toString().contains("is invalid because unable to load sink"));
}
@@ -90,7 +95,7 @@ public class FlumeSinkProcessorTest {
}
Assert.assertEquals(1, results.size());
for (ValidationResult vr : results) {
- logger.error(vr.toString());
+ logger.debug(vr.toString());
Assert.assertTrue(vr.toString().contains("is invalid because unable to create sink"));
}
@@ -109,12 +114,12 @@ public class FlumeSinkProcessorTest {
public void testNullSink() throws IOException {
TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
runner.setProperty(FlumeSinkProcessor.SINK_TYPE, NullSink.class.getName());
- FileInputStream fis = new FileInputStream("src/test/resources/testdata/records.txt");
- Map<String, String> attributes = new HashMap<>();
- attributes.put(CoreAttributes.FILENAME.key(), "records.txt");
- runner.enqueue(fis, attributes);
- runner.run();
- fis.close();
+ try (InputStream inputStream = getClass().getResourceAsStream("/testdata/records.txt")) {
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put(CoreAttributes.FILENAME.key(), "records.txt");
+ runner.enqueue(inputStream, attributes);
+ runner.run();
+ }
}
@Test
@@ -129,15 +134,10 @@ public class FlumeSinkProcessorTest {
}
runner.run();
}
-
+
@Test
public void testHdfsSink() throws IOException {
- File destDir = new File("target/hdfs");
- if (destDir.exists()) {
- FileUtils.deleteFilesInDir(destDir, null, logger);
- } else {
- destDir.mkdirs();
- }
+ File destDir = temp.newFolder("hdfs");
TestRunner runner = TestRunners.newTestRunner(FlumeSinkProcessor.class);
runner.setProperty(FlumeSinkProcessor.SINK_TYPE, "hdfs");
@@ -147,19 +147,22 @@ public class FlumeSinkProcessorTest {
"tier1.sinks.sink-1.hdfs.serializer = TEXT\n" +
"tier1.sinks.sink-1.serializer.appendNewline = false"
);
- FileInputStream fis = new FileInputStream("src/test/resources/testdata/records.txt");
- Map<String, String> attributes = new HashMap<>();
- attributes.put(CoreAttributes.FILENAME.key(), "records.txt");
- runner.enqueue(fis, attributes);
- runner.run();
- fis.close();
+ try (InputStream inputStream = getClass().getResourceAsStream("/testdata/records.txt")) {
+ Map<String, String> attributes = new HashMap<>();
+ attributes.put(CoreAttributes.FILENAME.key(), "records.txt");
+ runner.enqueue(inputStream, attributes);
+ runner.run();
+ }
File[] files = destDir.listFiles((FilenameFilter)HiddenFileFilter.VISIBLE);
assertEquals("Unexpected number of destination files.", 1, files.length);
File dst = files[0];
- byte[] expectedMd5 = FileUtils.computeMd5Digest(new File("src/test/resources/testdata/records.txt"));
+ byte[] expectedMd5;
+ try (InputStream md5Stream = getClass().getResourceAsStream("/testdata/records.txt")) {
+ expectedMd5 = FileUtils.computeMd5Digest(md5Stream);
+ }
byte[] actualMd5 = FileUtils.computeMd5Digest(dst);
Assert.assertArrayEquals("Destination file doesn't match source data", expectedMd5, actualMd5);
}
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3529bb33/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSourceProcessorTest.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSourceProcessorTest.java b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSourceProcessorTest.java
index bbcf116..043e115 100644
--- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSourceProcessorTest.java
+++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/java/org/apache/nifi/processors/flume/FlumeSourceProcessorTest.java
@@ -34,15 +34,18 @@ import org.apache.nifi.util.TestRunners;
import org.apache.nifi.util.file.FileUtils;
import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FlumeSourceProcessorTest {
- private static final Logger logger =
- LoggerFactory.getLogger(FlumeSourceProcessorTest.class);
+ private static final Logger logger = LoggerFactory.getLogger(FlumeSourceProcessorTest.class);
+ @Rule
+ public final TemporaryFolder temp = new TemporaryFolder();
@Test
public void testValidators() {
@@ -58,7 +61,7 @@ public class FlumeSourceProcessorTest {
}
Assert.assertEquals(1, results.size());
for (ValidationResult vr : results) {
- logger.error(vr.toString());
+ logger.debug(vr.toString());
Assert.assertTrue(vr.toString().contains("is invalid because Source Type is required"));
}
@@ -72,7 +75,7 @@ public class FlumeSourceProcessorTest {
}
Assert.assertEquals(1, results.size());
for (ValidationResult vr : results) {
- logger.error(vr.toString());
+ logger.debug(vr.toString());
Assert.assertTrue(vr.toString().contains("is invalid because unable to load source"));
}
@@ -86,7 +89,7 @@ public class FlumeSourceProcessorTest {
}
Assert.assertEquals(1, results.size());
for (ValidationResult vr : results) {
- logger.error(vr.toString());
+ logger.debug(vr.toString());
Assert.assertTrue(vr.toString().contains("is invalid because unable to create source"));
}
@@ -108,22 +111,16 @@ public class FlumeSourceProcessorTest {
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(FlumeSourceProcessor.SUCCESS);
Assert.assertEquals(1, flowFiles.size());
for (MockFlowFile flowFile : flowFiles) {
- logger.error(flowFile.toString());
+ logger.debug(flowFile.toString());
Assert.assertEquals(1, flowFile.getSize());
}
}
@Test
public void testSourceWithConfig() throws IOException {
- File spoolDirectory = new File("target/spooldir");
- if (spoolDirectory.exists()) {
- FileUtils.deleteFilesInDir(spoolDirectory, null, logger);
- } else {
- spoolDirectory.mkdirs();
- }
- File src = new File("src/test/resources/testdata/records.txt");
+ File spoolDirectory = temp.newFolder("spooldir");
File dst = new File(spoolDirectory, "records.txt");
- FileUtils.copyFile(src, dst, false, false, logger);
+ FileUtils.copyFile(getClass().getResourceAsStream("/testdata/records.txt"), dst, true, false);
TestRunner runner = TestRunners.newTestRunner(FlumeSourceProcessor.class);
runner.setProperty(FlumeSourceProcessor.SOURCE_TYPE, "spooldir");
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3529bb33/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/core-site.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/core-site.xml b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/core-site.xml
index 5e3b55c..849854b 100644
--- a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/core-site.xml
+++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/core-site.xml
@@ -20,6 +20,11 @@
<configuration>
<property>
<name>fs.defaultFS</name>
+ <!--
+ Hadoop doesn't support a chroot style operation for the
+ local filesystem so there's no benefit to setting this
+ to a directory other than '/'
+ -->
<value>file:///</value>
</property>
</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3529bb33/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/log4j.properties b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/log4j.properties
new file mode 100644
index 0000000..8c502ec
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/log4j.properties
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+log4j.rootLogger=INFO, CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x \u2013 %m%n
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3529bb33/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/simplelogger.properties
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/simplelogger.properties b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/simplelogger.properties
new file mode 100644
index 0000000..4994e7f
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-flume-bundle/nifi-flume-processors/src/test/resources/simplelogger.properties
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.slf4j.simpleLogger.logFile=System.out
+org.slf4j.simpleLogger.defaultLogLevel=info
+org.slf4j.simpleLogger.showDateTime=true
+org.slf4j.simpleLogger.dateTimeFormat=yyyy-MM-dd HH:mm:ss.SSS
+org.slf4j.simpleLogger.levelInBrackets=true
+org.slf4j.simpleLogger.log.org.apache.nifi.processors.flume=debug
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3529bb33/nifi/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/pom.xml b/nifi/pom.xml
index 682a426..3a5c6b7 100644
--- a/nifi/pom.xml
+++ b/nifi/pom.xml
@@ -805,6 +805,7 @@
<artifactId>nifi-geo-nar</artifactId>
<version>0.2.0-incubating-SNAPSHOT</version>
<type>nar</type>
+ </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-flume-nar</artifactId>