You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2016/05/19 19:15:19 UTC
[5/5] nifi git commit: NIFI-1811 Renaming MockProcessorLogger to
MockComponentLogger for consistency. Removing unused imports from
ExecuteScript causing checkstyle failures.
NIFI-1811 Renaming MockProcessorLogger to MockComponentLogger for consistency. Removing unused imports from ExecuteScript causing checkstyle failures.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1bd2cf0d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1bd2cf0d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1bd2cf0d
Branch: refs/heads/master
Commit: 1bd2cf0d09a7111bcecffd0f473aa71c25a69845
Parents: 372ffb8
Author: Aldrin Piri <al...@apache.org>
Authored: Thu May 19 13:03:29 2016 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Thu May 19 14:38:41 2016 -0400
----------------------------------------------------------------------
.../init/ControllerServiceInitializer.java | 4 +-
.../init/ProcessorInitializer.java | 4 +-
.../init/ReportingTaskingInitializer.java | 4 +-
.../documentation/mock/MockComponentLogger.java | 258 +++++++++++
...kControllerServiceInitializationContext.java | 2 +-
.../MockProcessorInitializationContext.java | 2 +-
.../documentation/mock/MockProcessorLogger.java | 258 -----------
.../MockReportingInitializationContext.java | 2 +-
.../apache/nifi/controller/FlowController.java | 6 +-
.../StandardProcessorInitializationContext.java | 4 +-
.../nifi/processors/hive/SelectHiveQL.java | 30 +-
.../processors/kafka/pubsub/KafkaPublisher.java | 8 +-
.../AbstractKafkaProcessorLifecycelTest.java | 456 -------------------
.../AbstractKafkaProcessorLifecycleTest.java | 456 +++++++++++++++++++
.../nifi/processors/script/ExecuteScript.java | 2 -
.../nifi/processors/standard/TransformJSON.java | 10 +-
16 files changed, 752 insertions(+), 754 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/1bd2cf0d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ControllerServiceInitializer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ControllerServiceInitializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ControllerServiceInitializer.java
index 7a6d62c..4d1651e 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ControllerServiceInitializer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ControllerServiceInitializer.java
@@ -22,7 +22,7 @@ import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.documentation.ConfigurableComponentInitializer;
import org.apache.nifi.documentation.mock.MockConfigurationContext;
import org.apache.nifi.documentation.mock.MockControllerServiceInitializationContext;
-import org.apache.nifi.documentation.mock.MockProcessorLogger;
+import org.apache.nifi.documentation.mock.MockComponentLogger;
import org.apache.nifi.documentation.util.ReflectionUtils;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.nar.NarCloseable;
@@ -49,7 +49,7 @@ public class ControllerServiceInitializer implements ConfigurableComponentInitia
try (NarCloseable narCloseable = NarCloseable.withNarLoader()) {
ControllerService controllerService = (ControllerService) component;
- final ComponentLog logger = new MockProcessorLogger();
+ final ComponentLog logger = new MockComponentLogger();
final MockConfigurationContext context = new MockConfigurationContext();
ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnShutdown.class, org.apache.nifi.processor.annotation.OnShutdown.class, controllerService, logger, context);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1bd2cf0d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ProcessorInitializer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ProcessorInitializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ProcessorInitializer.java
index 7fe4946..7a66f72 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ProcessorInitializer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ProcessorInitializer.java
@@ -21,7 +21,7 @@ import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.documentation.ConfigurableComponentInitializer;
import org.apache.nifi.documentation.mock.MockProcessContext;
import org.apache.nifi.documentation.mock.MockProcessorInitializationContext;
-import org.apache.nifi.documentation.mock.MockProcessorLogger;
+import org.apache.nifi.documentation.mock.MockComponentLogger;
import org.apache.nifi.documentation.util.ReflectionUtils;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.nar.NarCloseable;
@@ -47,7 +47,7 @@ public class ProcessorInitializer implements ConfigurableComponentInitializer {
Processor processor = (Processor) component;
try (NarCloseable narCloseable = NarCloseable.withNarLoader()) {
- final ComponentLog logger = new MockProcessorLogger();
+ final ComponentLog logger = new MockComponentLogger();
final MockProcessContext context = new MockProcessContext();
ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnShutdown.class, org.apache.nifi.processor.annotation.OnShutdown.class, processor, logger, context);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1bd2cf0d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ReportingTaskingInitializer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ReportingTaskingInitializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ReportingTaskingInitializer.java
index 171f1d9..32e878c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ReportingTaskingInitializer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ReportingTaskingInitializer.java
@@ -20,7 +20,7 @@ import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.documentation.ConfigurableComponentInitializer;
import org.apache.nifi.documentation.mock.MockConfigurationContext;
-import org.apache.nifi.documentation.mock.MockProcessorLogger;
+import org.apache.nifi.documentation.mock.MockComponentLogger;
import org.apache.nifi.documentation.mock.MockReportingInitializationContext;
import org.apache.nifi.documentation.util.ReflectionUtils;
import org.apache.nifi.nar.NarCloseable;
@@ -48,7 +48,7 @@ public class ReportingTaskingInitializer implements ConfigurableComponentInitial
try (NarCloseable narCloseable = NarCloseable.withNarLoader()) {
final MockConfigurationContext context = new MockConfigurationContext();
- ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnShutdown.class, org.apache.nifi.processor.annotation.OnShutdown.class, reportingTask, new MockProcessorLogger(), context);
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnShutdown.class, org.apache.nifi.processor.annotation.OnShutdown.class, reportingTask, new MockComponentLogger(), context);
}
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1bd2cf0d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockComponentLogger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockComponentLogger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockComponentLogger.java
new file mode 100644
index 0000000..dd2f1b3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockComponentLogger.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.documentation.mock;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.logging.LogLevel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Stubs out the functionality of a ComponentLog so that it can
+ * be used during initialization of a component.
+ *
+ */
+public class MockComponentLogger implements ComponentLog {
+
+ private static final Logger logger = LoggerFactory
+ .getLogger(MockComponentLogger.class);
+
+ @Override
+ public void warn(String msg, Throwable t) {
+ logger.warn(msg, t);
+ }
+
+ @Override
+ public void warn(String msg, Object[] os) {
+ logger.warn(msg, os);
+ }
+
+ @Override
+ public void warn(String msg, Object[] os, Throwable t) {
+ logger.warn(msg, os);
+ logger.warn("", t);
+ }
+
+ @Override
+ public void warn(String msg) {
+ logger.warn(msg);
+ }
+
+ @Override
+ public void trace(String msg, Throwable t) {
+ logger.trace(msg, t);
+ }
+
+ @Override
+ public void trace(String msg, Object[] os) {
+ logger.trace(msg, os);
+ }
+
+ @Override
+ public void trace(String msg) {
+ logger.trace(msg);
+ }
+
+ @Override
+ public void trace(String msg, Object[] os, Throwable t) {
+ logger.trace(msg, os);
+ logger.trace("", t);
+ }
+
+ @Override
+ public boolean isWarnEnabled() {
+ return logger.isWarnEnabled();
+ }
+
+ @Override
+ public boolean isTraceEnabled() {
+ return logger.isTraceEnabled();
+ }
+
+ @Override
+ public boolean isInfoEnabled() {
+ return logger.isInfoEnabled();
+ }
+
+ @Override
+ public boolean isErrorEnabled() {
+ return logger.isErrorEnabled();
+ }
+
+ @Override
+ public boolean isDebugEnabled() {
+ return logger.isDebugEnabled();
+ }
+
+ @Override
+ public void info(String msg, Throwable t) {
+ logger.info(msg, t);
+ }
+
+ @Override
+ public void info(String msg, Object[] os) {
+ logger.info(msg, os);
+ }
+
+ @Override
+ public void info(String msg) {
+ logger.info(msg);
+
+ }
+
+ @Override
+ public void info(String msg, Object[] os, Throwable t) {
+ logger.trace(msg, os);
+ logger.trace("", t);
+
+ }
+
+ @Override
+ public String getName() {
+ return logger.getName();
+ }
+
+ @Override
+ public void error(String msg, Throwable t) {
+ logger.error(msg, t);
+ }
+
+ @Override
+ public void error(String msg, Object[] os) {
+ logger.error(msg, os);
+ }
+
+ @Override
+ public void error(String msg) {
+ logger.error(msg);
+ }
+
+ @Override
+ public void error(String msg, Object[] os, Throwable t) {
+ logger.error(msg, os);
+ logger.error("", t);
+ }
+
+ @Override
+ public void debug(String msg, Throwable t) {
+ logger.debug(msg, t);
+ }
+
+ @Override
+ public void debug(String msg, Object[] os) {
+ logger.debug(msg, os);
+ }
+
+ @Override
+ public void debug(String msg, Object[] os, Throwable t) {
+ logger.debug(msg, os);
+ logger.debug("", t);
+ }
+
+ @Override
+ public void debug(String msg) {
+ logger.debug(msg);
+ }
+
+ @Override
+ public void log(LogLevel level, String msg, Throwable t) {
+ switch (level) {
+ case DEBUG:
+ debug(msg, t);
+ break;
+ case ERROR:
+ case FATAL:
+ error(msg, t);
+ break;
+ case INFO:
+ info(msg, t);
+ break;
+ case TRACE:
+ trace(msg, t);
+ break;
+ case WARN:
+ warn(msg, t);
+ break;
+ }
+ }
+
+ @Override
+ public void log(LogLevel level, String msg, Object[] os) {
+ switch (level) {
+ case DEBUG:
+ debug(msg, os);
+ break;
+ case ERROR:
+ case FATAL:
+ error(msg, os);
+ break;
+ case INFO:
+ info(msg, os);
+ break;
+ case TRACE:
+ trace(msg, os);
+ break;
+ case WARN:
+ warn(msg, os);
+ break;
+ }
+ }
+
+ @Override
+ public void log(LogLevel level, String msg) {
+ switch (level) {
+ case DEBUG:
+ debug(msg);
+ break;
+ case ERROR:
+ case FATAL:
+ error(msg);
+ break;
+ case INFO:
+ info(msg);
+ break;
+ case TRACE:
+ trace(msg);
+ break;
+ case WARN:
+ warn(msg);
+ break;
+ }
+ }
+
+ @Override
+ public void log(LogLevel level, String msg, Object[] os, Throwable t) {
+ switch (level) {
+ case DEBUG:
+ debug(msg, os, t);
+ break;
+ case ERROR:
+ case FATAL:
+ error(msg, os, t);
+ break;
+ case INFO:
+ info(msg, os, t);
+ break;
+ case TRACE:
+ trace(msg, os, t);
+ break;
+ case WARN:
+ warn(msg, os, t);
+ break;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1bd2cf0d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java
index c1cca26..abbde61 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java
@@ -41,7 +41,7 @@ public class MockControllerServiceInitializationContext implements ControllerSer
@Override
public ComponentLog getLogger() {
- return new MockProcessorLogger();
+ return new MockComponentLogger();
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/1bd2cf0d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessorInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessorInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessorInitializationContext.java
index 4218250..e536471 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessorInitializationContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessorInitializationContext.java
@@ -35,7 +35,7 @@ public class MockProcessorInitializationContext implements ProcessorInitializati
@Override
public ComponentLog getLogger() {
- return new MockProcessorLogger();
+ return new MockComponentLogger();
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/1bd2cf0d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessorLogger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessorLogger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessorLogger.java
deleted file mode 100644
index df9e673..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessorLogger.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.documentation.mock;
-
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.logging.LogLevel;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Stubs out the functionality of a ProcessorLog/ComponentLog so that it can
- * be used during initialization of a component.
- *
- */
-public class MockProcessorLogger implements ComponentLog {
-
- private static final Logger logger = LoggerFactory
- .getLogger(MockProcessorLogger.class);
-
- @Override
- public void warn(String msg, Throwable t) {
- logger.warn(msg, t);
- }
-
- @Override
- public void warn(String msg, Object[] os) {
- logger.warn(msg, os);
- }
-
- @Override
- public void warn(String msg, Object[] os, Throwable t) {
- logger.warn(msg, os);
- logger.warn("", t);
- }
-
- @Override
- public void warn(String msg) {
- logger.warn(msg);
- }
-
- @Override
- public void trace(String msg, Throwable t) {
- logger.trace(msg, t);
- }
-
- @Override
- public void trace(String msg, Object[] os) {
- logger.trace(msg, os);
- }
-
- @Override
- public void trace(String msg) {
- logger.trace(msg);
- }
-
- @Override
- public void trace(String msg, Object[] os, Throwable t) {
- logger.trace(msg, os);
- logger.trace("", t);
- }
-
- @Override
- public boolean isWarnEnabled() {
- return logger.isWarnEnabled();
- }
-
- @Override
- public boolean isTraceEnabled() {
- return logger.isTraceEnabled();
- }
-
- @Override
- public boolean isInfoEnabled() {
- return logger.isInfoEnabled();
- }
-
- @Override
- public boolean isErrorEnabled() {
- return logger.isErrorEnabled();
- }
-
- @Override
- public boolean isDebugEnabled() {
- return logger.isDebugEnabled();
- }
-
- @Override
- public void info(String msg, Throwable t) {
- logger.info(msg, t);
- }
-
- @Override
- public void info(String msg, Object[] os) {
- logger.info(msg, os);
- }
-
- @Override
- public void info(String msg) {
- logger.info(msg);
-
- }
-
- @Override
- public void info(String msg, Object[] os, Throwable t) {
- logger.trace(msg, os);
- logger.trace("", t);
-
- }
-
- @Override
- public String getName() {
- return logger.getName();
- }
-
- @Override
- public void error(String msg, Throwable t) {
- logger.error(msg, t);
- }
-
- @Override
- public void error(String msg, Object[] os) {
- logger.error(msg, os);
- }
-
- @Override
- public void error(String msg) {
- logger.error(msg);
- }
-
- @Override
- public void error(String msg, Object[] os, Throwable t) {
- logger.error(msg, os);
- logger.error("", t);
- }
-
- @Override
- public void debug(String msg, Throwable t) {
- logger.debug(msg, t);
- }
-
- @Override
- public void debug(String msg, Object[] os) {
- logger.debug(msg, os);
- }
-
- @Override
- public void debug(String msg, Object[] os, Throwable t) {
- logger.debug(msg, os);
- logger.debug("", t);
- }
-
- @Override
- public void debug(String msg) {
- logger.debug(msg);
- }
-
- @Override
- public void log(LogLevel level, String msg, Throwable t) {
- switch (level) {
- case DEBUG:
- debug(msg, t);
- break;
- case ERROR:
- case FATAL:
- error(msg, t);
- break;
- case INFO:
- info(msg, t);
- break;
- case TRACE:
- trace(msg, t);
- break;
- case WARN:
- warn(msg, t);
- break;
- }
- }
-
- @Override
- public void log(LogLevel level, String msg, Object[] os) {
- switch (level) {
- case DEBUG:
- debug(msg, os);
- break;
- case ERROR:
- case FATAL:
- error(msg, os);
- break;
- case INFO:
- info(msg, os);
- break;
- case TRACE:
- trace(msg, os);
- break;
- case WARN:
- warn(msg, os);
- break;
- }
- }
-
- @Override
- public void log(LogLevel level, String msg) {
- switch (level) {
- case DEBUG:
- debug(msg);
- break;
- case ERROR:
- case FATAL:
- error(msg);
- break;
- case INFO:
- info(msg);
- break;
- case TRACE:
- trace(msg);
- break;
- case WARN:
- warn(msg);
- break;
- }
- }
-
- @Override
- public void log(LogLevel level, String msg, Object[] os, Throwable t) {
- switch (level) {
- case DEBUG:
- debug(msg, os, t);
- break;
- case ERROR:
- case FATAL:
- error(msg, os, t);
- break;
- case INFO:
- info(msg, os, t);
- break;
- case TRACE:
- trace(msg, os, t);
- break;
- case WARN:
- warn(msg, os, t);
- break;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1bd2cf0d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockReportingInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockReportingInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockReportingInitializationContext.java
index abaa766..ebf59d6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockReportingInitializationContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockReportingInitializationContext.java
@@ -62,6 +62,6 @@ public class MockReportingInitializationContext implements ReportingInitializati
@Override
public ComponentLog getLogger() {
- return new MockProcessorLogger();
+ return new MockComponentLogger();
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1bd2cf0d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 4650c92..6b61b49 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -990,11 +990,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
Thread.currentThread().setContextClassLoader(detectedClassLoaderForType);
final Class<? extends Processor> processorClass = rawClass.asSubclass(Processor.class);
processor = processorClass.newInstance();
- final ComponentLog processorLogger = new SimpleProcessLogger(identifier, processor);
- final ProcessorInitializationContext ctx = new StandardProcessorInitializationContext(identifier, processorLogger, this);
+ final ComponentLog componentLogger = new SimpleProcessLogger(identifier, processor);
+ final ProcessorInitializationContext ctx = new StandardProcessorInitializationContext(identifier, componentLogger, this);
processor.initialize(ctx);
- LogRepositoryFactory.getRepository(identifier).setLogger(processorLogger);
+ LogRepositoryFactory.getRepository(identifier).setLogger(componentLogger);
return processor;
} catch (final Throwable t) {
throw new ProcessorInstantiationException(type, t);
http://git-wip-us.apache.org/repos/asf/nifi/blob/1bd2cf0d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessorInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessorInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessorInitializationContext.java
index e185331..ccd731d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessorInitializationContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessorInitializationContext.java
@@ -26,9 +26,9 @@ public class StandardProcessorInitializationContext implements ProcessorInitiali
private final ComponentLog logger;
private final ControllerServiceProvider serviceProvider;
- public StandardProcessorInitializationContext(final String identifier, final ComponentLog processorLog, final ControllerServiceProvider serviceProvider) {
+ public StandardProcessorInitializationContext(final String identifier, final ComponentLog componentLog, final ControllerServiceProvider serviceProvider) {
this.identifier = identifier;
- this.logger = processorLog;
+ this.logger = componentLog;
this.serviceProvider = serviceProvider;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1bd2cf0d/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
index 1b65af6..77ded36 100644
--- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
+++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java
@@ -16,6 +16,19 @@
*/
package org.apache.nifi.processors.hive;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -27,7 +40,7 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.dbcp.hive.HiveDBCPService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
@@ -38,19 +51,6 @@ import org.apache.nifi.util.LongHolder;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.hive.HiveJdbcCommon;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
@EventDriven
@InputRequirement(Requirement.INPUT_ALLOWED)
@Tags({"hive", "sql", "select", "jdbc", "query", "database"})
@@ -149,7 +149,7 @@ public class SelectHiveQL extends AbstractHiveQLProcessor {
}
}
- final ProcessorLog logger = getLogger();
+ final ComponentLog logger = getLogger();
final HiveDBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(HiveDBCPService.class);
final String selectQuery = context.getProperty(HIVEQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue();
final String outputFormat = context.getProperty(HIVEQL_OUTPUT_FORMAT).getValue();
http://git-wip-us.apache.org/repos/asf/nifi/blob/1bd2cf0d/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
index f42a892..79dfce7 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java
@@ -31,7 +31,7 @@ import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.stream.io.util.StreamDemarcator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,7 +48,7 @@ class KafkaPublisher implements Closeable {
private volatile long ackWaitTime = 30000;
- private volatile ProcessorLog processLog;
+ private volatile ComponentLog processLog;
/**
* Creates an instance of this class as well as the instance of the
@@ -177,10 +177,10 @@ class KafkaPublisher implements Closeable {
}
/**
- * Will set {@link ProcessorLog} as an additional logger to forward log
+ * Will set {@link ComponentLog} as an additional logger to forward log
* messages to NiFi bulletin
*/
- void setProcessLog(ProcessorLog processLog) {
+ void setProcessLog(ComponentLog processLog) {
this.processLog = processLog;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1bd2cf0d/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycelTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycelTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycelTest.java
deleted file mode 100644
index 6346ffd..0000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycelTest.java
+++ /dev/null
@@ -1,456 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-
-import java.io.Closeable;
-import java.lang.reflect.Field;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.logging.ProcessorLog;
-import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessSessionFactory;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-public class AbstractKafkaProcessorLifecycelTest {
-
- private final static Random random = new Random();
-
- @Test
- public void validateBaseProperties() throws Exception {
- TestRunner runner = TestRunners.newTestRunner(DummyProcessor.class);
- runner.setProperty(AbstractKafkaProcessor.BOOTSTRAP_SERVERS, "");
- runner.setProperty(AbstractKafkaProcessor.TOPIC, "foo");
- runner.setProperty(ConsumeKafka.CLIENT_ID, "foo");
-
- try {
- runner.assertValid();
- fail();
- } catch (AssertionError e) {
- assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
- }
-
- runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo");
- try {
- runner.assertValid();
- fail();
- } catch (AssertionError e) {
- assertTrue(e.getMessage().contains("'bootstrap.servers' validated against 'foo' is invalid"));
- }
- runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo:1234");
-
- runner.removeProperty(ConsumeKafka.TOPIC);
- try {
- runner.assertValid();
- fail();
- } catch (AssertionError e) {
- assertTrue(e.getMessage().contains("'topic' is invalid because topic is required"));
- }
-
- runner.setProperty(ConsumeKafka.TOPIC, "");
- try {
- runner.assertValid();
- fail();
- } catch (AssertionError e) {
- assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
- }
-
- runner.setProperty(ConsumeKafka.TOPIC, " ");
- try {
- runner.assertValid();
- fail();
- } catch (AssertionError e) {
- assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
- }
- runner.setProperty(ConsumeKafka.TOPIC, "blah");
-
- runner.removeProperty(ConsumeKafka.CLIENT_ID);
- try {
- runner.assertValid();
- fail();
- } catch (AssertionError e) {
- assertTrue(e.getMessage().contains("invalid because client.id is required"));
- }
-
- runner.setProperty(ConsumeKafka.CLIENT_ID, "");
- try {
- runner.assertValid();
- fail();
- } catch (AssertionError e) {
- assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
- }
-
- runner.setProperty(ConsumeKafka.CLIENT_ID, " ");
- try {
- runner.assertValid();
- fail();
- } catch (AssertionError e) {
- assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
- }
- runner.setProperty(ConsumeKafka.CLIENT_ID, "ghj");
-
- runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, "");
- try {
- runner.assertValid();
- fail();
- } catch (AssertionError e) {
- assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
- }
- runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, " ");
- try {
- runner.assertValid();
- fail();
- } catch (AssertionError e) {
- assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
- }
- }
-
- @Test
- @Ignore // just for extra sanity check
- public void validateConcurrencyWithRandomFailuresMultiple() throws Exception {
- for (int i = 0; i < 100; i++) {
- validateConcurrencyWithRandomFailures();
- }
- }
-
- @Test
- public void validateConcurrencyWithRandomFailures() throws Exception {
- ExecutorService processingExecutor = Executors.newFixedThreadPool(32);
- final AtomicInteger commitCounter = new AtomicInteger();
- final AtomicInteger rollbackCounter = new AtomicInteger();
- final AtomicInteger yieldCounter = new AtomicInteger();
-
- final ProcessSessionFactory sessionFactory = mock(ProcessSessionFactory.class);
- final ProcessSession session = mock(ProcessSession.class);
- when(sessionFactory.createSession()).thenReturn(session);
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
- commitCounter.incrementAndGet();
- return null;
- }
- }).when(session).commit();
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
- rollbackCounter.incrementAndGet();
- return null;
- }
- }).when(session).rollback(true);
-
- final ConcurrencyValidatingProcessor processor = spy(new ConcurrencyValidatingProcessor());
-
- int testCount = 1000;
- final CountDownLatch latch = new CountDownLatch(testCount);
- for (int i = 0; i < testCount; i++) {
- processingExecutor.execute(new Runnable() {
- @Override
- public void run() {
- ProcessContext context = mock(ProcessContext.class);
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
- yieldCounter.incrementAndGet();
- return null;
- }
- }).when(context).yield();
- if (random.nextInt(10) == 5) {
- when(context.getName()).thenReturn("fail");
- }
- try {
- processor.onTrigger(context, sessionFactory);
- } catch (Exception e) {
- fail();
- } finally {
- latch.countDown();
- }
- }
- });
- }
-
- assertTrue(latch.await(20000, TimeUnit.MILLISECONDS));
- processingExecutor.shutdown();
-
- System.out.println("SUCCESS: " + processor.successfulTriggers);
- System.out.println("FAILURE: " + processor.failedTriggers);
- System.out.println("INIT: " + processor.resourceReinitialized);
- System.out.println("YIELD CALLS: " + yieldCounter.get());
- System.out.println("COMMIT CALLS: " + commitCounter.get());
- System.out.println("ROLLBACK CALLS: " + rollbackCounter.get());
- System.out.println("Close CALLS: " + processor.closeCounter.get());
-
- /*
- * this has to be <= 1 since the last thread may come to finally block
- * after acceptTask flag has been reset at which point the close will
- * not be called (which is correct behavior since it will be invoked
- * explicitly by the life-cycle operations of a running processor).
- *
- * You can actually observe the =1 behavior in the next test where it is
- * always 0 close calls
- */
- int closeVsInitDiff = processor.resourceReinitialized.get() - processor.closeCounter.get();
- assertTrue(closeVsInitDiff <= 1);
-
- assertEquals(commitCounter.get(), processor.successfulTriggers.get());
- assertEquals(rollbackCounter.get(), processor.failedTriggers.get());
-
- assertEquals(testCount,
- processor.successfulTriggers.get() + processor.failedTriggers.get() + yieldCounter.get());
- }
-
- @Test
- public void validateConcurrencyWithAllSuccesses() throws Exception {
- ExecutorService processingExecutor = Executors.newFixedThreadPool(32);
- final AtomicInteger commitCounter = new AtomicInteger();
- final AtomicInteger rollbackCounter = new AtomicInteger();
- final AtomicInteger yieldCounter = new AtomicInteger();
-
- final ProcessSessionFactory sessionFactory = mock(ProcessSessionFactory.class);
- final ProcessSession session = mock(ProcessSession.class);
- when(sessionFactory.createSession()).thenReturn(session);
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
- commitCounter.incrementAndGet();
- return null;
- }
- }).when(session).commit();
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
- rollbackCounter.incrementAndGet();
- return null;
- }
- }).when(session).rollback(true);
-
- final ConcurrencyValidatingProcessor processor = spy(new ConcurrencyValidatingProcessor());
-
- int testCount = 1000;
- final CountDownLatch latch = new CountDownLatch(testCount);
- for (int i = 0; i < testCount; i++) {
- processingExecutor.execute(new Runnable() {
- @Override
- public void run() {
- ProcessContext context = mock(ProcessContext.class);
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
- yieldCounter.incrementAndGet();
- return null;
- }
- }).when(context).yield();
- try {
- processor.onTrigger(context, sessionFactory);
- } catch (Exception e) {
- fail();
- } finally {
- latch.countDown();
- }
- }
- });
- }
-
- assertTrue(latch.await(30000, TimeUnit.MILLISECONDS));
- processingExecutor.shutdown();
-
- System.out.println("SUCCESS: " + processor.successfulTriggers);
- System.out.println("FAILURE: " + processor.failedTriggers);
- System.out.println("INIT: " + processor.resourceReinitialized);
- System.out.println("YIELD CALLS: " + yieldCounter.get());
- System.out.println("COMMIT CALLS: " + commitCounter.get());
- System.out.println("ROLLBACK CALLS: " + rollbackCounter.get());
- System.out.println("Close CALLS: " + processor.closeCounter.get());
-
- /*
- * unlike previous test this one will always be 1 since there are no
- * failures
- */
- int closeVsInitDiff = processor.resourceReinitialized.get() - processor.closeCounter.get();
- assertEquals(1, closeVsInitDiff);
-
- assertEquals(commitCounter.get(), processor.successfulTriggers.get());
- assertEquals(rollbackCounter.get(), processor.failedTriggers.get());
-
- assertEquals(testCount,
- processor.successfulTriggers.get() + processor.failedTriggers.get() + yieldCounter.get());
- }
-
- @Test
- public void validateConcurrencyWithAllFailures() throws Exception {
- ExecutorService processingExecutor = Executors.newFixedThreadPool(32);
- final AtomicInteger commitCounter = new AtomicInteger();
- final AtomicInteger rollbackCounter = new AtomicInteger();
- final AtomicInteger yieldCounter = new AtomicInteger();
-
- final ProcessSessionFactory sessionFactory = mock(ProcessSessionFactory.class);
- final ProcessSession session = mock(ProcessSession.class);
- when(sessionFactory.createSession()).thenReturn(session);
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
- commitCounter.incrementAndGet();
- return null;
- }
- }).when(session).commit();
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
- rollbackCounter.incrementAndGet();
- return null;
- }
- }).when(session).rollback(true);
-
- final ConcurrencyValidatingProcessor processor = spy(new ConcurrencyValidatingProcessor());
-
- int testCount = 1000;
- final CountDownLatch latch = new CountDownLatch(testCount);
- for (int i = 0; i < testCount; i++) {
- processingExecutor.execute(new Runnable() {
- @Override
- public void run() {
- ProcessContext context = mock(ProcessContext.class);
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocation) throws Throwable {
- yieldCounter.incrementAndGet();
- return null;
- }
- }).when(context).yield();
- when(context.getName()).thenReturn("fail");
- try {
- processor.onTrigger(context, sessionFactory);
- } catch (Exception e) {
- fail();
- } finally {
- latch.countDown();
- }
- }
- });
- }
-
- assertTrue(latch.await(20000, TimeUnit.MILLISECONDS));
- processingExecutor.shutdown();
-
- System.out.println("SUCCESS: " + processor.successfulTriggers);
- System.out.println("FAILURE: " + processor.failedTriggers);
- System.out.println("INIT: " + processor.resourceReinitialized);
- System.out.println("YIELD CALLS: " + yieldCounter.get());
- System.out.println("COMMIT CALLS: " + commitCounter.get());
- System.out.println("ROLLBACK CALLS: " + rollbackCounter.get());
- System.out.println("Close CALLS: " + processor.closeCounter.get());
-
- /*
- * unlike previous test this one will always be 0 since all triggers are
- * failures
- */
- int closeVsInitDiff = processor.resourceReinitialized.get() - processor.closeCounter.get();
- assertEquals(0, closeVsInitDiff);
-
- assertEquals(commitCounter.get(), processor.successfulTriggers.get());
- assertEquals(rollbackCounter.get(), processor.failedTriggers.get());
-
- assertEquals(testCount,
- processor.successfulTriggers.get() + processor.failedTriggers.get() + yieldCounter.get());
- }
-
- /**
- *
- */
- public static class DummyProcessor extends AbstractKafkaProcessor<Closeable> {
- @Override
- protected boolean rendezvousWithKafka(ProcessContext context, ProcessSession session) throws ProcessException {
- return true;
- }
-
- @Override
- protected Closeable buildKafkaResource(ProcessContext context, ProcessSession session) throws ProcessException {
- return mock(Closeable.class);
- }
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return SHARED_DESCRIPTORS;
- }
- }
-
-
- public static class ConcurrencyValidatingProcessor extends AbstractKafkaProcessor<Closeable> {
- final AtomicInteger failedTriggers = new AtomicInteger();
- final AtomicInteger successfulTriggers = new AtomicInteger();
- final AtomicInteger resourceReinitialized = new AtomicInteger();
- final AtomicInteger closeCounter = new AtomicInteger();
-
- ConcurrencyValidatingProcessor() {
- try {
- Field loggerField = AbstractSessionFactoryProcessor.class.getDeclaredField("logger");
- loggerField.setAccessible(true);
- loggerField.set(this, mock(ProcessorLog.class));
- } catch (Exception e) {
- throw new IllegalStateException(e);
- }
- }
-
- @Override
- @OnStopped
- public void close() {
- super.close();
- assertTrue(this.kafkaResource == null);
- closeCounter.incrementAndGet();
- }
-
- @Override
- protected boolean rendezvousWithKafka(ProcessContext context, ProcessSession session) {
- assertNotNull(this.kafkaResource);
- if ("fail".equals(context.getName())) {
- failedTriggers.incrementAndGet();
- throw new RuntimeException("Intentional");
- }
- this.successfulTriggers.incrementAndGet();
- return true;
- }
-
- @Override
- protected Closeable buildKafkaResource(ProcessContext context, ProcessSession session) throws ProcessException {
- this.resourceReinitialized.incrementAndGet();
- return mock(Closeable.class);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1bd2cf0d/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycleTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycleTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycleTest.java
new file mode 100644
index 0000000..d09be60
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycleTest.java
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.io.Closeable;
+import java.lang.reflect.Field;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessSessionFactory;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class AbstractKafkaProcessorLifecycleTest {
+
+ private final static Random random = new Random();
+
+ @Test
+ public void validateBaseProperties() throws Exception {
+ TestRunner runner = TestRunners.newTestRunner(DummyProcessor.class);
+ runner.setProperty(AbstractKafkaProcessor.BOOTSTRAP_SERVERS, "");
+ runner.setProperty(AbstractKafkaProcessor.TOPIC, "foo");
+ runner.setProperty(ConsumeKafka.CLIENT_ID, "foo");
+
+ try {
+ runner.assertValid();
+ fail();
+ } catch (AssertionError e) {
+ assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
+ }
+
+ runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo");
+ try {
+ runner.assertValid();
+ fail();
+ } catch (AssertionError e) {
+ assertTrue(e.getMessage().contains("'bootstrap.servers' validated against 'foo' is invalid"));
+ }
+ runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo:1234");
+
+ runner.removeProperty(ConsumeKafka.TOPIC);
+ try {
+ runner.assertValid();
+ fail();
+ } catch (AssertionError e) {
+ assertTrue(e.getMessage().contains("'topic' is invalid because topic is required"));
+ }
+
+ runner.setProperty(ConsumeKafka.TOPIC, "");
+ try {
+ runner.assertValid();
+ fail();
+ } catch (AssertionError e) {
+ assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
+ }
+
+ runner.setProperty(ConsumeKafka.TOPIC, " ");
+ try {
+ runner.assertValid();
+ fail();
+ } catch (AssertionError e) {
+ assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
+ }
+ runner.setProperty(ConsumeKafka.TOPIC, "blah");
+
+ runner.removeProperty(ConsumeKafka.CLIENT_ID);
+ try {
+ runner.assertValid();
+ fail();
+ } catch (AssertionError e) {
+ assertTrue(e.getMessage().contains("invalid because client.id is required"));
+ }
+
+ runner.setProperty(ConsumeKafka.CLIENT_ID, "");
+ try {
+ runner.assertValid();
+ fail();
+ } catch (AssertionError e) {
+ assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
+ }
+
+ runner.setProperty(ConsumeKafka.CLIENT_ID, " ");
+ try {
+ runner.assertValid();
+ fail();
+ } catch (AssertionError e) {
+ assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
+ }
+ runner.setProperty(ConsumeKafka.CLIENT_ID, "ghj");
+
+ runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, "");
+ try {
+ runner.assertValid();
+ fail();
+ } catch (AssertionError e) {
+ assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
+ }
+ runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, " ");
+ try {
+ runner.assertValid();
+ fail();
+ } catch (AssertionError e) {
+ assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
+ }
+ }
+
+ @Test
+ @Ignore // just for extra sanity check
+ public void validateConcurrencyWithRandomFailuresMultiple() throws Exception {
+ for (int i = 0; i < 100; i++) {
+ validateConcurrencyWithRandomFailures();
+ }
+ }
+
+ @Test
+ public void validateConcurrencyWithRandomFailures() throws Exception {
+ ExecutorService processingExecutor = Executors.newFixedThreadPool(32);
+ final AtomicInteger commitCounter = new AtomicInteger();
+ final AtomicInteger rollbackCounter = new AtomicInteger();
+ final AtomicInteger yieldCounter = new AtomicInteger();
+
+ final ProcessSessionFactory sessionFactory = mock(ProcessSessionFactory.class);
+ final ProcessSession session = mock(ProcessSession.class);
+ when(sessionFactory.createSession()).thenReturn(session);
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ commitCounter.incrementAndGet();
+ return null;
+ }
+ }).when(session).commit();
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ rollbackCounter.incrementAndGet();
+ return null;
+ }
+ }).when(session).rollback(true);
+
+ final ConcurrencyValidatingProcessor processor = spy(new ConcurrencyValidatingProcessor());
+
+ int testCount = 1000;
+ final CountDownLatch latch = new CountDownLatch(testCount);
+ for (int i = 0; i < testCount; i++) {
+ processingExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ ProcessContext context = mock(ProcessContext.class);
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ yieldCounter.incrementAndGet();
+ return null;
+ }
+ }).when(context).yield();
+ if (random.nextInt(10) == 5) {
+ when(context.getName()).thenReturn("fail");
+ }
+ try {
+ processor.onTrigger(context, sessionFactory);
+ } catch (Exception e) {
+ fail();
+ } finally {
+ latch.countDown();
+ }
+ }
+ });
+ }
+
+ assertTrue(latch.await(20000, TimeUnit.MILLISECONDS));
+ processingExecutor.shutdown();
+
+ System.out.println("SUCCESS: " + processor.successfulTriggers);
+ System.out.println("FAILURE: " + processor.failedTriggers);
+ System.out.println("INIT: " + processor.resourceReinitialized);
+ System.out.println("YIELD CALLS: " + yieldCounter.get());
+ System.out.println("COMMIT CALLS: " + commitCounter.get());
+ System.out.println("ROLLBACK CALLS: " + rollbackCounter.get());
+ System.out.println("Close CALLS: " + processor.closeCounter.get());
+
+ /*
+ * this has to be <= 1 since the last thread may come to finally block
+ * after acceptTask flag has been reset at which point the close will
+ * not be called (which is correct behavior since it will be invoked
+ * explicitly by the life-cycle operations of a running processor).
+ *
+ * You can actually observe the =1 behavior in the next test where it is
+ * always 0 close calls
+ */
+ int closeVsInitDiff = processor.resourceReinitialized.get() - processor.closeCounter.get();
+ assertTrue(closeVsInitDiff <= 1);
+
+ assertEquals(commitCounter.get(), processor.successfulTriggers.get());
+ assertEquals(rollbackCounter.get(), processor.failedTriggers.get());
+
+ assertEquals(testCount,
+ processor.successfulTriggers.get() + processor.failedTriggers.get() + yieldCounter.get());
+ }
+
+ @Test
+ public void validateConcurrencyWithAllSuccesses() throws Exception {
+ ExecutorService processingExecutor = Executors.newFixedThreadPool(32);
+ final AtomicInteger commitCounter = new AtomicInteger();
+ final AtomicInteger rollbackCounter = new AtomicInteger();
+ final AtomicInteger yieldCounter = new AtomicInteger();
+
+ final ProcessSessionFactory sessionFactory = mock(ProcessSessionFactory.class);
+ final ProcessSession session = mock(ProcessSession.class);
+ when(sessionFactory.createSession()).thenReturn(session);
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ commitCounter.incrementAndGet();
+ return null;
+ }
+ }).when(session).commit();
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ rollbackCounter.incrementAndGet();
+ return null;
+ }
+ }).when(session).rollback(true);
+
+ final ConcurrencyValidatingProcessor processor = spy(new ConcurrencyValidatingProcessor());
+
+ int testCount = 1000;
+ final CountDownLatch latch = new CountDownLatch(testCount);
+ for (int i = 0; i < testCount; i++) {
+ processingExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ ProcessContext context = mock(ProcessContext.class);
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ yieldCounter.incrementAndGet();
+ return null;
+ }
+ }).when(context).yield();
+ try {
+ processor.onTrigger(context, sessionFactory);
+ } catch (Exception e) {
+ fail();
+ } finally {
+ latch.countDown();
+ }
+ }
+ });
+ }
+
+ assertTrue(latch.await(30000, TimeUnit.MILLISECONDS));
+ processingExecutor.shutdown();
+
+ System.out.println("SUCCESS: " + processor.successfulTriggers);
+ System.out.println("FAILURE: " + processor.failedTriggers);
+ System.out.println("INIT: " + processor.resourceReinitialized);
+ System.out.println("YIELD CALLS: " + yieldCounter.get());
+ System.out.println("COMMIT CALLS: " + commitCounter.get());
+ System.out.println("ROLLBACK CALLS: " + rollbackCounter.get());
+ System.out.println("Close CALLS: " + processor.closeCounter.get());
+
+ /*
+ * unlike previous test this one will always be 1 since there are no
+ * failures
+ */
+ int closeVsInitDiff = processor.resourceReinitialized.get() - processor.closeCounter.get();
+ assertEquals(1, closeVsInitDiff);
+
+ assertEquals(commitCounter.get(), processor.successfulTriggers.get());
+ assertEquals(rollbackCounter.get(), processor.failedTriggers.get());
+
+ assertEquals(testCount,
+ processor.successfulTriggers.get() + processor.failedTriggers.get() + yieldCounter.get());
+ }
+
+ @Test
+ public void validateConcurrencyWithAllFailures() throws Exception {
+ ExecutorService processingExecutor = Executors.newFixedThreadPool(32);
+ final AtomicInteger commitCounter = new AtomicInteger();
+ final AtomicInteger rollbackCounter = new AtomicInteger();
+ final AtomicInteger yieldCounter = new AtomicInteger();
+
+ final ProcessSessionFactory sessionFactory = mock(ProcessSessionFactory.class);
+ final ProcessSession session = mock(ProcessSession.class);
+ when(sessionFactory.createSession()).thenReturn(session);
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ commitCounter.incrementAndGet();
+ return null;
+ }
+ }).when(session).commit();
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ rollbackCounter.incrementAndGet();
+ return null;
+ }
+ }).when(session).rollback(true);
+
+ final ConcurrencyValidatingProcessor processor = spy(new ConcurrencyValidatingProcessor());
+
+ int testCount = 1000;
+ final CountDownLatch latch = new CountDownLatch(testCount);
+ for (int i = 0; i < testCount; i++) {
+ processingExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ ProcessContext context = mock(ProcessContext.class);
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ yieldCounter.incrementAndGet();
+ return null;
+ }
+ }).when(context).yield();
+ when(context.getName()).thenReturn("fail");
+ try {
+ processor.onTrigger(context, sessionFactory);
+ } catch (Exception e) {
+ fail();
+ } finally {
+ latch.countDown();
+ }
+ }
+ });
+ }
+
+ assertTrue(latch.await(20000, TimeUnit.MILLISECONDS));
+ processingExecutor.shutdown();
+
+ System.out.println("SUCCESS: " + processor.successfulTriggers);
+ System.out.println("FAILURE: " + processor.failedTriggers);
+ System.out.println("INIT: " + processor.resourceReinitialized);
+ System.out.println("YIELD CALLS: " + yieldCounter.get());
+ System.out.println("COMMIT CALLS: " + commitCounter.get());
+ System.out.println("ROLLBACK CALLS: " + rollbackCounter.get());
+ System.out.println("Close CALLS: " + processor.closeCounter.get());
+
+ /*
+ * unlike previous test this one will always be 0 since all triggers are
+ * failures
+ */
+ int closeVsInitDiff = processor.resourceReinitialized.get() - processor.closeCounter.get();
+ assertEquals(0, closeVsInitDiff);
+
+ assertEquals(commitCounter.get(), processor.successfulTriggers.get());
+ assertEquals(rollbackCounter.get(), processor.failedTriggers.get());
+
+ assertEquals(testCount,
+ processor.successfulTriggers.get() + processor.failedTriggers.get() + yieldCounter.get());
+ }
+
+ /**
+ *
+ */
+ public static class DummyProcessor extends AbstractKafkaProcessor<Closeable> {
+ @Override
+ protected boolean rendezvousWithKafka(ProcessContext context, ProcessSession session) throws ProcessException {
+ return true;
+ }
+
+ @Override
+ protected Closeable buildKafkaResource(ProcessContext context, ProcessSession session) throws ProcessException {
+ return mock(Closeable.class);
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return SHARED_DESCRIPTORS;
+ }
+ }
+
+
+ public static class ConcurrencyValidatingProcessor extends AbstractKafkaProcessor<Closeable> {
+ final AtomicInteger failedTriggers = new AtomicInteger();
+ final AtomicInteger successfulTriggers = new AtomicInteger();
+ final AtomicInteger resourceReinitialized = new AtomicInteger();
+ final AtomicInteger closeCounter = new AtomicInteger();
+
+ ConcurrencyValidatingProcessor() {
+ try {
+ Field loggerField = AbstractSessionFactoryProcessor.class.getDeclaredField("logger");
+ loggerField.setAccessible(true);
+ loggerField.set(this, mock(ComponentLog.class));
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ @OnStopped
+ public void close() {
+ super.close();
+ assertTrue(this.kafkaResource == null);
+ closeCounter.incrementAndGet();
+ }
+
+ @Override
+ protected boolean rendezvousWithKafka(ProcessContext context, ProcessSession session) {
+ assertNotNull(this.kafkaResource);
+ if ("fail".equals(context.getName())) {
+ failedTriggers.incrementAndGet();
+ throw new RuntimeException("Intentional");
+ }
+ this.successfulTriggers.incrementAndGet();
+ return true;
+ }
+
+ @Override
+ protected Closeable buildKafkaResource(ProcessContext context, ProcessSession session) throws ProcessException {
+ this.resourceReinitialized.incrementAndGet();
+ return mock(Closeable.class);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/1bd2cf0d/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java
index 224c55f..b863a42 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java
@@ -30,8 +30,6 @@ import javax.script.ScriptException;
import javax.script.SimpleBindings;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
-import org.apache.nifi.annotation.behavior.TriggerSerially;
-import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
http://git-wip-us.apache.org/repos/asf/nifi/blob/1bd2cf0d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java
index c8576aa..675b135 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java
@@ -42,7 +42,7 @@ import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@@ -57,13 +57,13 @@ import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
import com.bazaarvoice.jolt.CardinalityTransform;
-import com.bazaarvoice.jolt.Shiftr;
-import com.bazaarvoice.jolt.Removr;
import com.bazaarvoice.jolt.Chainr;
import com.bazaarvoice.jolt.Defaultr;
+import com.bazaarvoice.jolt.JsonUtils;
+import com.bazaarvoice.jolt.Removr;
+import com.bazaarvoice.jolt.Shiftr;
import com.bazaarvoice.jolt.Sortr;
import com.bazaarvoice.jolt.Transform;
-import com.bazaarvoice.jolt.JsonUtils;
@EventDriven
@SideEffectFree
@@ -175,7 +175,7 @@ public class TransformJSON extends AbstractProcessor {
return;
}
- final ProcessorLog logger = getLogger();
+ final ComponentLog logger = getLogger();
final StopWatch stopWatch = new StopWatch(true);
final byte[] originalContent = new byte[(int) original.getSize()];