You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/04/11 23:33:32 UTC
[09/19] nifi git commit: NIFI-1280: Refactoring to make more generic
so that other data types can be supported;
created InputStreams to content on-demand so that multiple passes can be made
over FlowFile content if required. Created new Controller Servic
NIFI-1280: Refactoring to make more generic so that other data types can be supported; created InputStreams to content on-demand so that multiple passes can be made over FlowFile content if required. Created new Controller Services for reading and writing specific data types
Signed-off-by: Matt Burgess <ma...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a88d3bfa
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a88d3bfa
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a88d3bfa
Branch: refs/heads/master
Commit: a88d3bfa3c53d9cbe375f2b89eaa9248eb92df29
Parents: 4d5872a
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Jul 11 14:57:00 2016 -0400
Committer: Matt Burgess <ma...@apache.org>
Committed: Tue Apr 11 19:29:04 2017 -0400
----------------------------------------------------------------------
nifi-assembly/pom.xml | 59 +-
.../apache/nifi/util/MockProcessSession.java | 4 +-
.../nifi/cluster/manager/NodeResponse.java | 2 +-
.../repository/StandardProcessSession.java | 8 +-
.../nifi/processor/SimpleProcessLogger.java | 75 ++-
.../src/main/resources/META-INF/NOTICE | 16 +
.../nifi-standard-processors/pom.xml | 27 +-
.../calcite/adapter/csv/CsvEnumerator2.java | 303 -----------
.../apache/calcite/adapter/csv/CsvSchema2.java | 98 ----
.../calcite/adapter/csv/CsvSchemaFactory2.java | 53 --
.../calcite/adapter/csv/CsvTableScan2.java | 104 ----
.../adapter/csv/CsvTranslatableTable2.java | 121 -----
.../processors/standard/FilterCSVColumns.java | 258 ---------
.../nifi/processors/standard/QueryFlowFile.java | 541 +++++++++++++++++++
.../nifi/queryflowfile/FlowFileEnumerator.java | 150 +++++
.../FlowFileProjectTableScanRule.java | 76 +++
.../nifi/queryflowfile/FlowFileTable.java | 203 +++++++
.../nifi/queryflowfile/FlowFileTableScan.java | 91 ++++
.../org.apache.nifi.processor.Processor | 1 +
.../additionalDetails.html | 47 ++
.../standard/TestFilterCSVColumns.java | 117 ----
.../processors/standard/TestQueryFlowFile.java | 379 +++++++++++++
.../resources/TestFilterCSVColumns/Numeric.csv | 5 -
.../resources/TestFilterCSVColumns/US500.csv | 1 -
.../TestFilterCSVColumns/US500_typeless.csv | 1 -
.../pom.xml | 31 ++
.../nifi/serialization/DataTypeValidator.java | 82 +++
.../serialization/MalformedRecordException.java | 31 ++
.../apache/nifi/serialization/RecordReader.java | 55 ++
.../nifi/serialization/RecordSetWriter.java | 45 ++
.../serialization/RecordSetWriterFactory.java | 30 +
.../apache/nifi/serialization/RecordWriter.java | 41 ++
.../serialization/RowRecordReaderFactory.java | 33 ++
.../nifi/serialization/SimpleRecordSchema.java | 126 +++++
.../apache/nifi/serialization/WriteResult.java | 69 +++
.../nifi/serialization/record/DataType.java | 95 ++++
.../serialization/record/ListRecordSet.java | 44 ++
.../nifi/serialization/record/MapRecord.java | 322 +++++++++++
.../nifi/serialization/record/Record.java | 62 +++
.../nifi/serialization/record/RecordField.java | 64 +++
.../serialization/record/RecordFieldType.java | 114 ++++
.../nifi/serialization/record/RecordSchema.java | 58 ++
.../nifi/serialization/record/RecordSet.java | 53 ++
.../record/ResultSetRecordSet.java | 169 ++++++
.../record/TypeMismatchException.java | 28 +
.../pom.xml | 41 ++
.../src/main/resources/META-INF/LICENSE | 269 +++++++++
.../src/main/resources/META-INF/NOTICE | 77 +++
.../.gitignore | 1 +
.../nifi-record-serialization-services/pom.xml | 94 ++++
.../java/org/apache/nifi/avro/AvroReader.java | 40 ++
.../org/apache/nifi/avro/AvroRecordReader.java | 254 +++++++++
.../apache/nifi/avro/AvroRecordSetWriter.java | 67 +++
.../apache/nifi/avro/AvroSchemaValidator.java | 45 ++
.../org/apache/nifi/avro/WriteAvroResult.java | 286 ++++++++++
.../java/org/apache/nifi/csv/CSVReader.java | 49 ++
.../org/apache/nifi/csv/CSVRecordReader.java | 216 ++++++++
.../org/apache/nifi/csv/CSVRecordSetWriter.java | 37 ++
.../org/apache/nifi/csv/WriteCSVResult.java | 127 +++++
.../nifi/grok/GrokExpressionValidator.java | 48 ++
.../java/org/apache/nifi/grok/GrokReader.java | 99 ++++
.../org/apache/nifi/grok/GrokRecordReader.java | 323 +++++++++++
.../nifi/json/AbstractJsonRowRecordReader.java | 307 +++++++++++
.../org/apache/nifi/json/JsonPathReader.java | 126 +++++
.../nifi/json/JsonPathRowRecordReader.java | 280 ++++++++++
.../org/apache/nifi/json/JsonPathValidator.java | 60 ++
.../apache/nifi/json/JsonRecordSetWriter.java | 66 +++
.../org/apache/nifi/json/JsonTreeReader.java | 56 ++
.../nifi/json/JsonTreeRowRecordReader.java | 115 ++++
.../org/apache/nifi/json/PropertyNameUtil.java | 88 +++
.../org/apache/nifi/json/WriteJsonResult.java | 309 +++++++++++
.../serialization/AbstractRecordSetWriter.java | 84 +++
.../nifi/serialization/DataTypeUtils.java | 165 ++++++
.../SimpleDateFormatValidator.java | 48 ++
.../UserTypeOverrideRowReader.java | 78 +++
.../nifi/text/FreeFormTextRecordSetWriter.java | 80 +++
.../apache/nifi/text/FreeFormTextWriter.java | 99 ++++
...org.apache.nifi.controller.ControllerService | 28 +
.../main/resources/default-grok-patterns.txt | 115 ++++
.../additionalDetails.html | 185 +++++++
.../additionalDetails.html | 396 ++++++++++++++
.../additionalDetails.html | 227 ++++++++
.../additionalDetails.html | 102 ++++
.../apache/nifi/avro/TestAvroRecordReader.java | 221 ++++++++
.../apache/nifi/csv/TestCSVRecordReader.java | 122 +++++
.../org/apache/nifi/csv/TestWriteCSVResult.java | 121 +++++
.../apache/nifi/grok/TestGrokRecordReader.java | 190 +++++++
.../nifi/json/TestJsonPathRowRecordReader.java | 292 ++++++++++
.../nifi/json/TestJsonTreeRowRecordReader.java | 266 +++++++++
.../apache/nifi/json/TestWriteJsonResult.java | 102 ++++
.../test/resources/csv/extra-white-space.csv | 9 +
.../test/resources/csv/multi-bank-account.csv | 3 +
.../test/resources/csv/single-bank-account.csv | 2 +
.../resources/grok/error-with-stack-trace.log | 25 +
...ifi-log-sample-multiline-with-stacktrace.log | 29 +
.../src/test/resources/grok/nifi-log-sample.log | 5 +
.../resources/grok/single-line-log-messages.txt | 5 +
.../bank-account-array-different-schemas.json | 30 +
.../bank-account-array-optional-balance.json | 29 +
.../test/resources/json/bank-account-array.json | 21 +
.../test/resources/json/json-with-unicode.json | 9 +
.../test/resources/json/output/dataTypes.json | 18 +
.../resources/json/primitive-type-array.json | 13 +
.../resources/json/single-bank-account.json | 10 +
.../json/single-element-nested-array.json | 16 +
.../resources/json/single-element-nested.json | 13 +
.../pom.xml | 30 +
.../nifi-standard-services-api-nar/pom.xml | 5 +
nifi-nar-bundles/nifi-standard-services/pom.xml | 2 +
pom.xml | 11 +
110 files changed, 9838 insertions(+), 1140 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index 070beba..4a2babb 100755
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -1,15 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
-<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
-license agreements. See the NOTICE file distributed with this work for additional
-information regarding copyright ownership. The ASF licenses this file to
-You under the Apache License, Version 2.0 (the "License"); you may not use
-this file except in compliance with the License. You may obtain a copy of
-the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
-by applicable law or agreed to in writing, software distributed under the
-License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
-OF ANY KIND, either express or implied. See the License for the specific
-language governing permissions and limitations under the License. -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
@@ -405,6 +406,11 @@ language governing permissions and limitations under the License. -->
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-services-nar</artifactId>
+ <type>nar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
<artifactId>nifi-mqtt-nar</artifactId>
<type>nar</type>
</dependency>
@@ -513,13 +519,17 @@ language governing permissions and limitations under the License. -->
<configuration>
<name>nifi</name>
<summary>Apache NiFi</summary>
- <description>Apache NiFi is dataflow system based on the Flow-Based Programming concepts.</description>
- <license>Apache License, Version 2.0 and others (see included LICENSE file)</license>
+ <description>Apache NiFi is dataflow system
+ based on the Flow-Based Programming
+ concepts.</description>
+ <license>Apache License, Version 2.0 and
+ others (see included LICENSE file)</license>
<url>http://nifi.apache.org</url>
<group>Utilities</group>
<prefix>/opt/nifi</prefix>
<defineStatements>
- <defineStatement>_use_internal_dependency_generator 0</defineStatement>
+ <defineStatement>_use_internal_dependency_generator
+ 0</defineStatement>
</defineStatements>
<defaultDirmode>750</defaultDirmode>
<defaultFilemode>640</defaultFilemode>
@@ -536,7 +546,13 @@ language governing permissions and limitations under the License. -->
</installScriptlet>
<preinstallScriptlet>
<script>
- /usr/bin/getent group nifi >/dev/null || /usr/sbin/groupadd -r nifi; /usr/bin/getent passwd nifi >/dev/null || /usr/sbin/useradd -r -g nifi -d /opt/nifi -s /sbin/nologin -c "NiFi System User" nifi
+ /usr/bin/getent group nifi
+ >/dev/null || /usr/sbin/groupadd
+ -r nifi; /usr/bin/getent passwd nifi
+ >/dev/null || /usr/sbin/useradd
+ -r -g nifi -d /opt/nifi -s
+ /sbin/nologin -c "NiFi System User"
+ nifi
</script>
</preinstallScriptlet>
</configuration>
@@ -602,10 +618,12 @@ language governing permissions and limitations under the License. -->
<mapping>
<directory>/opt/nifi/nifi-${project.version}/lib</directory>
</mapping>
- <!-- The lib excludes and lib/bootstrap includes are computed by looking at the desired contents of
- lib vs the desired contents of bootstrap directories. The bootstrap directory should be comprised of explicitly
- included items as found from the lib/bootstrap of a non rpm build and the lib folder should be specific excludes
- being those which we want in bootstrap and NOT in lib. -->
+ <!-- The lib excludes and lib/bootstrap
+ includes are computed by looking at the desired contents of lib vs the desired
+ contents of bootstrap directories. The bootstrap directory should be comprised
+ of explicitly included items as found from the lib/bootstrap of a non rpm
+ build and the lib folder should be specific excludes being those which we
+ want in bootstrap and NOT in lib. -->
<mapping>
<directory>/opt/nifi/nifi-${project.version}/lib</directory>
<dependency>
@@ -636,7 +654,8 @@ language governing permissions and limitations under the License. -->
<!-- must be in lib <exclude>ch.qos.logback:logback-core</exclude> -->
<exclude>org.apache.nifi:nifi-security-utils</exclude>
<exclude>org.apache.nifi:nifi-utils</exclude>
- <!-- Items to not include which are also not in bootstrap -->
+ <!-- Items to not include
+ which are also not in bootstrap -->
<exclude>org.apache.nifi:nifi-resources</exclude>
<exclude>org.apache.nifi:nifi-docs</exclude>
</excludes>
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
index faf6e42..7dd9714 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
@@ -218,8 +218,8 @@ public class MockProcessSession implements ProcessSession {
}
}
- throw new FlowFileHandlingException("Cannot commit session because the following Input Streams were created via "
- + "calls to ProcessSession.read(FlowFile) and never closed: " + openStreamCopy);
+ // throw new FlowFileHandlingException("Cannot commit session because the following Input Streams were created via "
+ // + "calls to ProcessSession.read(FlowFile) and never closed: " + openStreamCopy);
}
committed = true;
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java
index 7c911b8..73dd92f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/NodeResponse.java
@@ -239,7 +239,7 @@ public class NodeResponse {
// if no client response was created, then generate a 500 response
if (hasThrowable()) {
- return Response.status(Status.INTERNAL_SERVER_ERROR).build();
+ return Response.status(Status.INTERNAL_SERVER_ERROR).entity(getThrowable().toString()).build();
}
// set the status
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index fe99fb3..3a51816 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -2157,10 +2157,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
final InputStream rawIn = getInputStream(source, record.getCurrentClaim(), record.getCurrentClaimOffset(), false);
final InputStream limitedIn = new LimitedInputStream(rawIn, source.getSize());
- final ByteCountingInputStream countingStream = new ByteCountingInputStream(limitedIn, this.bytesRead);
+ final ByteCountingInputStream countingStream = new ByteCountingInputStream(limitedIn);
final FlowFileAccessInputStream ffais = new FlowFileAccessInputStream(countingStream, source, record.getCurrentClaim());
final InputStream errorHandlingStream = new InputStream() {
+ private boolean closed = false;
@Override
public int read() throws IOException {
@@ -2201,7 +2202,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
@Override
public void close() throws IOException {
- StandardProcessSession.this.bytesRead += countingStream.getBytesRead();
+ if (!closed) {
+ StandardProcessSession.this.bytesRead += countingStream.getBytesRead();
+ closed = true;
+ }
ffais.close();
openInputStreams.remove(source);
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/SimpleProcessLogger.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/SimpleProcessLogger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/SimpleProcessLogger.java
index cc17abc..8e92604 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/SimpleProcessLogger.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/SimpleProcessLogger.java
@@ -16,11 +16,13 @@
*/
package org.apache.nifi.processor;
+import java.util.Arrays;
+
+import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.logging.LogRepository;
import org.apache.nifi.logging.LogRepositoryFactory;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,16 +49,6 @@ public class SimpleProcessLogger implements ComponentLog {
return newArgs;
}
- private Object[] translateException(final Object[] os) {
- if (os != null && os.length > 0 && (os[os.length - 1] instanceof Throwable)) {
- final Object[] osCopy = new Object[os.length];
- osCopy[osCopy.length - 1] = os[os.length - 1].toString();
- System.arraycopy(os, 0, osCopy, 0, os.length - 1);
- return osCopy;
- }
- return os;
- }
-
private boolean lastArgIsException(final Object[] os) {
return (os != null && os.length > 0 && (os[os.length - 1] instanceof Throwable));
}
@@ -80,7 +72,7 @@ public class SimpleProcessLogger implements ComponentLog {
}
if (lastArgIsException(os)) {
- warn(msg, translateException(os), (Throwable) os[os.length - 1]);
+ warn(msg, Arrays.copyOfRange(os, 0, os.length - 1), (Throwable) os[os.length - 1]);
} else {
msg = "{} " + msg;
os = addProcessor(os);
@@ -95,13 +87,9 @@ public class SimpleProcessLogger implements ComponentLog {
return;
}
- os = addProcessorAndThrowable(os, t);
+ os = addProcessorAndThrowable(os, t, logger.isDebugEnabled());
msg = "{} " + msg + ": {}";
-
logger.warn(msg, os);
- if (logger.isDebugEnabled()) {
- logger.warn("", t);
- }
logRepository.addLogMessage(LogLevel.WARN, msg, os, t);
}
@@ -159,11 +147,10 @@ public class SimpleProcessLogger implements ComponentLog {
return;
}
- os = addProcessorAndThrowable(os, t);
+ os = addProcessorAndThrowable(os, t, true);
msg = "{} " + msg + ": {}";
logger.trace(msg, os);
- logger.trace("", t);
logRepository.addLogMessage(LogLevel.TRACE, msg, os, t);
}
@@ -240,13 +227,10 @@ public class SimpleProcessLogger implements ComponentLog {
return;
}
- os = addProcessorAndThrowable(os, t);
+ os = addProcessorAndThrowable(os, t, logger.isDebugEnabled());
msg = "{} " + msg + ": {}";
logger.info(msg, os);
- if (logger.isDebugEnabled()) {
- logger.info("", t);
- }
logRepository.addLogMessage(LogLevel.INFO, msg, os, t);
}
@@ -261,14 +245,16 @@ public class SimpleProcessLogger implements ComponentLog {
return;
}
- msg = "{} " + msg;
- Object[] os = t == null ? new Object[]{component} : new Object[]{component, t.toString()};
- logger.error(msg, os);
- if (t != null){
- logger.error("", t);
- logRepository.addLogMessage(LogLevel.ERROR, msg, os, t);
- } else {
+ if (t == null) {
+ msg = "{} " + msg;
+ final Object[] os = new Object[] {component};
+ logger.error(msg, os);
logRepository.addLogMessage(LogLevel.ERROR, msg, os);
+ } else {
+ msg = "{} " + msg + ": {}";
+ final Object[] os = new Object[] {component, t.toString(), t};
+ logger.error(msg, os);
+ logRepository.addLogMessage(LogLevel.ERROR, msg, os, t);
}
}
@@ -279,7 +265,7 @@ public class SimpleProcessLogger implements ComponentLog {
}
if (lastArgIsException(os)) {
- error(msg, translateException(os), (Throwable) os[os.length - 1]);
+ error(msg, Arrays.copyOfRange(os, 0, os.length - 1), (Throwable) os[os.length - 1]);
} else {
os = addProcessor(os);
msg = "{} " + msg;
@@ -299,21 +285,27 @@ public class SimpleProcessLogger implements ComponentLog {
return;
}
- os = addProcessorAndThrowable(os, t);
+ os = addProcessorAndThrowable(os, t, true);
msg = "{} " + msg + ": {}";
logger.error(msg, os);
- logger.error("", t);
logRepository.addLogMessage(LogLevel.ERROR, msg, os, t);
}
- private Object[] addProcessorAndThrowable(final Object[] os, final Throwable t) {
- final Object[] modifiedArgs = new Object[os.length + 2];
- modifiedArgs[0] = component.toString();
- for (int i = 0; i < os.length; i++) {
- modifiedArgs[i + 1] = os[i];
+ private Object[] addProcessorAndThrowable(final Object[] os, final Throwable t, final boolean includeStackTrace) {
+ final Object[] modifiedArgs;
+ if (t == null || !includeStackTrace) {
+ modifiedArgs = new Object[os.length + 2];
+ modifiedArgs[0] = component.toString();
+ System.arraycopy(os, 0, modifiedArgs, 1, os.length);
+ modifiedArgs[modifiedArgs.length - 1] = StringUtils.EMPTY;
+ } else {
+ modifiedArgs = new Object[os.length + 3];
+ modifiedArgs[0] = component.toString();
+ System.arraycopy(os, 0, modifiedArgs, 1, os.length);
+ modifiedArgs[modifiedArgs.length - 2] = t.toString();
+ modifiedArgs[modifiedArgs.length - 1] = t;
}
- modifiedArgs[modifiedArgs.length - 1] = (t == null) ? "" : t.toString();
return modifiedArgs;
}
@@ -350,13 +342,10 @@ public class SimpleProcessLogger implements ComponentLog {
return;
}
- os = addProcessorAndThrowable(os, t);
+ os = addProcessorAndThrowable(os, t, true);
msg = "{} " + msg + ": {}";
logger.debug(msg, os);
- if (logger.isDebugEnabled()) {
- logger.debug("", t);
- }
logRepository.addLogMessage(LogLevel.DEBUG, msg, os, t);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE
index e0d1300..51c6080 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-nar/src/main/resources/META-INF/NOTICE
@@ -178,6 +178,22 @@ The following binary components are provided under the Apache Software License v
Grok
Copyright 2014 Anthony Corbacho, and contributors.
+ (ASLv2) Apache Calcite
+ The following NOTICE information applies:
+ Apache Calcite
+ Copyright 2012-2017 The Apache Software Foundation
+
+ This product includes software developed at
+ The Apache Software Foundation (http://www.apache.org/).
+
+ This product is based on source code originally developed
+ by DynamoBI Corporation, LucidEra Inc., SQLstream Inc. and others
+ under the auspices of the Eigenbase Foundation
+ and released as the LucidDB project.
+
+ The web site includes files generated by Jekyll.
+
+
************************
Common Development and Distribution License 1.1
************************
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index d410f43..e390097 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -1,15 +1,16 @@
<?xml version="1.0"?>
-<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
-license agreements. See the NOTICE file distributed with this work for additional
-information regarding copyright ownership. The ASF licenses this file to
-You under the Apache License, Version 2.0 (the "License"); you may not use
-this file except in compliance with the License. You may obtain a copy of
-the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
-by applicable law or agreed to in writing, software distributed under the
-License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
-OF ANY KIND, either express or implied. See the License for the specific
-language governing permissions and limitations under the License. -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+ license agreements. See the NOTICE file distributed with this work for additional
+ information regarding copyright ownership. The ASF licenses this file to
+ You under the Apache License, Version 2.0 (the "License"); you may not use
+ this file except in compliance with the License. You may obtain a copy of
+ the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+ by applicable law or agreed to in writing, software distributed under the
+ License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+ OF ANY KIND, either express or implied. See the License for the specific
+ language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
@@ -49,6 +50,10 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-http-context-map-api</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-record-serialization-service-api</artifactId>
+ </dependency>
+ <dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator2.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator2.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator2.java
deleted file mode 100644
index 0f928ce..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvEnumerator2.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.adapter.csv;
-
-import java.io.IOException;
-import java.text.ParseException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.TimeZone;
-
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.linq4j.Enumerator;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.util.Pair;
-import org.apache.commons.lang3.time.FastDateFormat;
-
-import au.com.bytecode.opencsv.CSVReader;
-
-
-/** Enumerator that reads from a CSV stream.
- *
- * @param <E> Row type
- */
-class CsvEnumerator2<E> implements Enumerator<E> {
- private final CSVReader reader;
- private final String[] filterValues;
- private final RowConverter<E> rowConverter;
- private E current;
-
- private static final FastDateFormat TIME_FORMAT_DATE;
- private static final FastDateFormat TIME_FORMAT_TIME;
- private static final FastDateFormat TIME_FORMAT_TIMESTAMP;
-
- static {
- TimeZone gmt = TimeZone.getTimeZone("GMT");
- TIME_FORMAT_DATE = FastDateFormat.getInstance("yyyy-MM-dd", gmt);
- TIME_FORMAT_TIME = FastDateFormat.getInstance("HH:mm:ss", gmt);
- TIME_FORMAT_TIMESTAMP =
- FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss", gmt);
- }
-
- public CsvEnumerator2(CSVReader csvReader, List<CsvFieldType> fieldTypes) {
- this(verifyNotNullReader(csvReader), fieldTypes, identityList(fieldTypes.size()));
- }
-
- public CsvEnumerator2(CSVReader csvReader, List<CsvFieldType> fieldTypes, int[] fields) {
- //noinspection unchecked
- this(csvReader, null, (RowConverter<E>) converter(fieldTypes, fields));
- }
-
- public CsvEnumerator2(CSVReader csvReader, String[] filterValues, RowConverter<E> rowConverter) {
- this.rowConverter = rowConverter;
- this.filterValues = filterValues;
- this.reader = csvReader;
- }
-
- static public CSVReader verifyNotNullReader(CSVReader csvReader) {
- if (csvReader==null)
- throw new IllegalArgumentException("csvReader cannot be null");
- return csvReader;
- }
-
- private static RowConverter<?> converter(List<CsvFieldType> fieldTypes,
- int[] fields) {
- if (fields.length == 1) {
- final int field = fields[0];
- return new SingleColumnRowConverter(fieldTypes.get(field), field);
- } else {
- return new ArrayRowConverter(fieldTypes, fields);
- }
- }
-
- /** Deduces the names and types of a table's columns by reading the first line
- * of a CSV stream. */
- static public RelDataType deduceRowType(JavaTypeFactory typeFactory, String[] firstLine,
- List<CsvFieldType> fieldTypes) {
- final List<RelDataType> types = new ArrayList<>();
- final List<String> names = new ArrayList<>();
- for (String string : firstLine) {
- final String name;
- final CsvFieldType fieldType;
- final int colon = string.indexOf(':');
- if (colon >= 0) {
- name = string.substring(0, colon);
- String typeString = string.substring(colon + 1);
- typeString = typeString.trim();
- fieldType = CsvFieldType.of(typeString);
- if (fieldType == null) {
- System.out.println("WARNING: Found unknown type: "
- + typeString + " in first line: "
- + " for column: " + name
- + ". Will assume the type of column is string");
- }
- } else {
- name = string;
- fieldType = null;
- }
- final RelDataType type;
- if (fieldType == null) {
- type = typeFactory.createJavaType(String.class);
- } else {
- type = fieldType.toType(typeFactory);
- }
- names.add(name);
- types.add(type);
- if (fieldTypes != null) {
- fieldTypes.add(fieldType);
- }
- }
-
- if (names.isEmpty()) {
- names.add("line");
- types.add(typeFactory.createJavaType(String.class));
- }
- return typeFactory.createStructType(Pair.zip(names, types));
- }
-
- public E current() {
- return current;
- }
-
- public boolean moveNext() {
- try {
- outer:
- for (;;) {
- final String[] strings = reader.readNext();
- if (strings == null) {
- current = null;
- reader.close();
- return false;
- }
- if (filterValues != null) {
- for (int i = 0; i < strings.length; i++) {
- String filterValue = filterValues[i];
- if (filterValue != null) {
- if (!filterValue.equals(strings[i])) {
- continue outer;
- }
- }
- }
- }
- current = rowConverter.convertRow(strings);
- return true;
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- public void reset() {
- throw new UnsupportedOperationException();
- }
-
- public void close() {
- try {
- reader.close();
- } catch (IOException e) {
- throw new RuntimeException("Error closing CSV reader", e);
- }
- }
-
- /** Returns an array of integers {0, ..., n - 1}. */
- static int[] identityList(int n) {
- int[] integers = new int[n];
- for (int i = 0; i < n; i++) {
- integers[i] = i;
- }
- return integers;
- }
-
- /** Row converter. */
- abstract static class RowConverter<E> {
- abstract E convertRow(String[] rows);
-
- protected Object convert(CsvFieldType fieldType, String string) {
- if (fieldType == null) {
- return string;
- }
- switch (fieldType) {
- case BOOLEAN:
- if (string.length() == 0) {
- return null;
- }
- return Boolean.parseBoolean(string);
- case BYTE:
- if (string.length() == 0) {
- return null;
- }
- return Byte.parseByte(string);
- case SHORT:
- if (string.length() == 0) {
- return null;
- }
- return Short.parseShort(string);
- case INT:
- if (string.length() == 0) {
- return null;
- }
- return Integer.parseInt(string);
- case LONG:
- if (string.length() == 0) {
- return null;
- }
- return Long.parseLong(string);
- case FLOAT:
- if (string.length() == 0) {
- return null;
- }
- return Float.parseFloat(string);
- case DOUBLE:
- if (string.length() == 0) {
- return null;
- }
- return Double.parseDouble(string);
- case DATE:
- if (string.length() == 0) {
- return null;
- }
- try {
- Date date = TIME_FORMAT_DATE.parse(string);
- return new java.sql.Date(date.getTime());
- } catch (ParseException e) {
- return null;
- }
- case TIME:
- if (string.length() == 0) {
- return null;
- }
- try {
- Date date = TIME_FORMAT_TIME.parse(string);
- return new java.sql.Time(date.getTime());
- } catch (ParseException e) {
- return null;
- }
- case TIMESTAMP:
- if (string.length() == 0) {
- return null;
- }
- try {
- Date date = TIME_FORMAT_TIMESTAMP.parse(string);
- return new java.sql.Timestamp(date.getTime());
- } catch (ParseException e) {
- return null;
- }
- case STRING:
- default:
- return string;
- }
- }
- }
-
- /** Array row converter. */
- static class ArrayRowConverter extends RowConverter<Object[]> {
- private final CsvFieldType[] fieldTypes;
- private final int[] fields;
-
- ArrayRowConverter(List<CsvFieldType> fieldTypes, int[] fields) {
- this.fieldTypes = fieldTypes.toArray(new CsvFieldType[fieldTypes.size()]);
- this.fields = fields;
- }
-
- public Object[] convertRow(String[] strings) {
- final Object[] objects = new Object[fields.length];
- for (int i = 0; i < fields.length; i++) {
- int field = fields[i];
- objects[i] = convert(fieldTypes[field], strings[field]);
- }
- return objects;
- }
- }
-
- /** Single column row converter. */
- private static class SingleColumnRowConverter extends RowConverter {
- private final CsvFieldType fieldType;
- private final int fieldIndex;
-
- private SingleColumnRowConverter(CsvFieldType fieldType, int fieldIndex) {
- this.fieldType = fieldType;
- this.fieldIndex = fieldIndex;
- }
-
- public Object convertRow(String[] strings) {
- return convert(fieldType, strings[fieldIndex]);
- }
- }
-}
-
-// End CsvEnumerator2.java
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchema2.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchema2.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchema2.java
deleted file mode 100644
index f724f79..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchema2.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.adapter.csv;
-
-import java.io.Reader;
-import java.util.Map;
-
-import org.apache.calcite.schema.Table;
-import org.apache.calcite.schema.impl.AbstractSchema;
-
-import com.google.common.collect.ImmutableMap;
-
-/**
- * Schema mapped onto a directory of CSV files. Each table in the schema
- * is a CSV file in that directory.
- */
-public class CsvSchema2 extends AbstractSchema {
- final private Map<String, Reader> inputs;
- private final CsvTable.Flavor flavor;
- private Map<String, Table> tableMap;
-
- /**
- * Creates a CSV schema.
- *
- * @param inputs Inputs map
- * @param flavor Whether to instantiate flavor tables that undergo
- * query optimization
- */
- public CsvSchema2(Map<String, Reader> inputs, CsvTable.Flavor flavor) {
- super();
- this.inputs = inputs;
- this.flavor = flavor;
- }
-
- /** Looks for a suffix on a string and returns
- * either the string with the suffix removed
- * or the original string. */
- private static String trim(String s, String suffix) {
- String trimmed = trimOrNull(s, suffix);
- return trimmed != null ? trimmed : s;
- }
-
- /** Looks for a suffix on a string and returns
- * either the string with the suffix removed
- * or null. */
- private static String trimOrNull(String s, String suffix) {
- return s.endsWith(suffix)
- ? s.substring(0, s.length() - suffix.length())
- : null;
- }
-
- @Override protected Map<String, Table> getTableMap() {
-
- if (tableMap!=null)
- return tableMap;
-
- // Build a map from table name to table; each file becomes a table.
- final ImmutableMap.Builder<String, Table> builder = ImmutableMap.builder();
-
- for (Map.Entry<String, Reader> entry : inputs.entrySet()) {
- final Table table = createTable(entry.getValue());
- builder.put(entry.getKey(), table);
- }
-
- tableMap = builder.build();
- return tableMap;
- }
-
- /** Creates different sub-type of table based on the "flavor" attribute. */
- private Table createTable(Reader readerx) {
- switch (flavor) {
- case TRANSLATABLE:
- return new CsvTranslatableTable2(readerx, null);
-// case SCANNABLE:
-// return new CsvScannableTable(file, null);
-// case FILTERABLE:
-// return new CsvFilterableTable(file, null);
- default:
- throw new AssertionError("Unknown flavor " + flavor);
- }
- }
-}
-
-// End CsvSchema2.java
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchemaFactory2.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchemaFactory2.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchemaFactory2.java
deleted file mode 100644
index f8ec576..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvSchemaFactory2.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.adapter.csv;
-
-import java.io.Reader;
-import java.util.Map;
-
-import org.apache.calcite.schema.Schema;
-import org.apache.calcite.schema.SchemaFactory;
-import org.apache.calcite.schema.SchemaPlus;
-
-/**
- * Factory that creates a {@link CsvSchema}.
- *
- * <p>Allows a custom schema to be included in a <code><i>model</i>.json</code>
- * file.</p>
- */
-@SuppressWarnings("UnusedDeclaration")
-public class CsvSchemaFactory2 implements SchemaFactory {
- final private Map<String, Reader> inputs;
- // public constructor, per factory contract
- public CsvSchemaFactory2(Map<String, Reader> inputs) {
- this.inputs = inputs;
- }
-
- public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) {
- String flavorName = (String) operand.get("flavor");
- CsvTable.Flavor flavor;
- if (flavorName == null) {
- flavor = CsvTable.Flavor.SCANNABLE;
- } else {
- flavor = CsvTable.Flavor.valueOf(flavorName.toUpperCase());
- }
-
- return new CsvSchema2(inputs, flavor);
- }
-}
-
-// End CsvSchemaFactory2.java
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTableScan2.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTableScan2.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTableScan2.java
deleted file mode 100644
index 75f013c..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTableScan2.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.adapter.csv;
-
-import org.apache.calcite.adapter.enumerable.EnumerableConvention;
-import org.apache.calcite.adapter.enumerable.EnumerableRel;
-import org.apache.calcite.adapter.enumerable.EnumerableRelImplementor;
-import org.apache.calcite.adapter.enumerable.PhysType;
-import org.apache.calcite.adapter.enumerable.PhysTypeImpl;
-import org.apache.calcite.linq4j.tree.Blocks;
-import org.apache.calcite.linq4j.tree.Expressions;
-import org.apache.calcite.linq4j.tree.Primitive;
-import org.apache.calcite.plan.RelOptCluster;
-import org.apache.calcite.plan.RelOptPlanner;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.plan.RelTraitSet;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.RelWriter;
-import org.apache.calcite.rel.core.TableScan;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelDataTypeField;
-
-import java.util.List;
-
-/**
- * Relational expression representing a scan of a CSV stream.
- *
- * <p>Like any table scan, it serves as a leaf node of a query tree.</p>
- */
-public class CsvTableScan2 extends TableScan implements EnumerableRel {
- final CsvTranslatableTable2 csvTable;
- final int[] fields;
-
- protected CsvTableScan2(RelOptCluster cluster, RelOptTable table,
- CsvTranslatableTable2 csvTable, int[] fields) {
- super(cluster, cluster.traitSetOf(EnumerableConvention.INSTANCE), table);
- this.csvTable = csvTable;
- this.fields = fields;
-
- assert csvTable != null;
- }
-
- @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
- assert inputs.isEmpty();
- return new CsvTableScan2(getCluster(), table, csvTable, fields);
- }
-
- @Override public RelWriter explainTerms(RelWriter pw) {
- return super.explainTerms(pw)
- .item("fields", Primitive.asList(fields));
- }
-
- @Override public RelDataType deriveRowType() {
- final List<RelDataTypeField> fieldList = table.getRowType().getFieldList();
- final RelDataTypeFactory.FieldInfoBuilder builder =
- getCluster().getTypeFactory().builder();
- for (int field : fields) {
- builder.add(fieldList.get(field));
- }
- return builder.build();
- }
-
- @Override public void register(RelOptPlanner planner) {
- planner.addRule(CsvProjectTableScanRule.INSTANCE);
- }
-
- public Result implement(EnumerableRelImplementor implementor, Prefer pref) {
- PhysType physType =
- PhysTypeImpl.of(
- implementor.getTypeFactory(),
- getRowType(),
- pref.preferArray());
-
- if (table instanceof JsonTable) {
- return implementor.result(
- physType,
- Blocks.toBlock(
- Expressions.call(table.getExpression(JsonTable.class),
- "enumerable")));
- }
- return implementor.result(
- physType,
- Blocks.toBlock(
- Expressions.call(table.getExpression(CsvTranslatableTable2.class),
- "project", Expressions.constant(fields))));
- }
-}
-
-// End CsvTableScan.java
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTranslatableTable2.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTranslatableTable2.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTranslatableTable2.java
deleted file mode 100644
index bc28fdd..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/calcite/adapter/csv/CsvTranslatableTable2.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.calcite.adapter.csv;
-
-import org.apache.calcite.adapter.java.JavaTypeFactory;
-import org.apache.calcite.linq4j.AbstractEnumerable;
-import org.apache.calcite.linq4j.Enumerable;
-import org.apache.calcite.linq4j.Enumerator;
-import org.apache.calcite.linq4j.QueryProvider;
-import org.apache.calcite.linq4j.Queryable;
-import org.apache.calcite.linq4j.tree.Expression;
-import org.apache.calcite.plan.RelOptTable;
-import org.apache.calcite.rel.RelNode;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.rel.type.RelProtoDataType;
-import org.apache.calcite.schema.QueryableTable;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.calcite.schema.Schemas;
-import org.apache.calcite.schema.TranslatableTable;
-
-import au.com.bytecode.opencsv.CSVReader;
-
-import java.io.IOException;
-import java.io.Reader;
-import java.lang.reflect.Type;
-import java.util.ArrayList;
-
-/**
- * Table based on a CSV stream.
- */
-public class CsvTranslatableTable2 extends CsvTable
- implements QueryableTable, TranslatableTable {
-
- final private CSVReader csvReader;
- private CsvEnumerator2<Object> csvEnumerator2;
- final private String[] firstLine;
-
- /** Creates a CsvTable.
- */
- CsvTranslatableTable2(Reader readerx, RelProtoDataType protoRowType) {
- super(null, protoRowType);
- this.csvReader = new CSVReader(readerx);
- try {
- this.firstLine = csvReader.readNext();
- } catch (IOException e) {
- throw new RuntimeException("csvReader.readNext() failed ", e);
- }
- }
-
- public String toString() {
- return "CsvTranslatableTable2";
- }
-
- /** Returns an enumerable over a given projection of the fields.
- *
- * <p>Called from generated code. */
- public Enumerable<Object> project(final int[] fields) {
- return new AbstractEnumerable<Object>() {
- public Enumerator<Object> enumerator() {
- return csvEnumerator2;
- }
- };
- }
-
- public Expression getExpression(SchemaPlus schema, String tableName,
- Class clazz) {
- return Schemas.tableExpression(schema, getElementType(), tableName, clazz);
- }
-
- public Type getElementType() {
- return Object[].class;
- }
-
- public <T> Queryable<T> asQueryable(QueryProvider queryProvider,
- SchemaPlus schema, String tableName) {
- throw new UnsupportedOperationException();
- }
-
- public RelNode toRel(
- RelOptTable.ToRelContext context,
- RelOptTable relOptTable) {
- // Request all fields.
- final int fieldCount = relOptTable.getRowType().getFieldCount();
- final int[] fields = CsvEnumerator.identityList(fieldCount);
- return new CsvTableScan2(context.getCluster(), relOptTable, this, fields);
- }
-
- @Override
- public RelDataType getRowType(RelDataTypeFactory typeFactory) {
- RelDataType rowType = null;
-
- if (fieldTypes == null) {
- fieldTypes = new ArrayList<CsvFieldType>();
- rowType = CsvEnumerator2.deduceRowType((JavaTypeFactory) typeFactory, firstLine, fieldTypes);
- } else {
- rowType = CsvEnumerator2.deduceRowType((JavaTypeFactory) typeFactory, firstLine, null);
- }
-
- if (csvEnumerator2==null)
- csvEnumerator2 = new CsvEnumerator2<Object>(csvReader, fieldTypes);
-
- return rowType;
- }
-}
-
-// End CsvTranslatableTable2.java
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FilterCSVColumns.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FilterCSVColumns.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FilterCSVColumns.java
deleted file mode 100644
index 718f462..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/FilterCSVColumns.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard;
-
-import static java.sql.Types.CHAR;
-import static java.sql.Types.LONGNVARCHAR;
-import static java.sql.Types.LONGVARCHAR;
-import static java.sql.Types.NCHAR;
-import static java.sql.Types.NVARCHAR;
-import static java.sql.Types.VARCHAR;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.Reader;
-import java.nio.charset.StandardCharsets;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.calcite.adapter.csv.CsvSchemaFactory2;
-import org.apache.calcite.jdbc.CalciteConnection;
-import org.apache.calcite.schema.Schema;
-import org.apache.calcite.schema.SchemaPlus;
-import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.nifi.annotation.behavior.EventDriven;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-import org.apache.nifi.annotation.behavior.SideEffectFree;
-import org.apache.nifi.annotation.behavior.SupportsBatching;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ProcessorLog;
-import org.apache.nifi.processor.AbstractProcessor;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessorInitializationContext;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.StreamCallback;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.stream.io.BufferedInputStream;
-import org.apache.nifi.util.StopWatch;
-
-import com.google.common.collect.ImmutableMap;
-
-@EventDriven
-@SideEffectFree
-@SupportsBatching
-@Tags({"xml", "xslt", "transform"})
-@InputRequirement(Requirement.INPUT_REQUIRED)
-@CapabilityDescription("Filter out specific columns from CSV data. Some other transformations are also supported."
- + "Columns can be renamed, simple calculations performed, aggregations, etc."
- + "SQL select statement is used to specify how CSV data should be transformed."
- + "SQL statement follows standard SQL, some restrictions may apply."
- + "Successfully transformed CSV data is routed to the 'success' relationship."
- + "If transform fails, the original FlowFile is routed to the 'failure' relationship")
-public class FilterCSVColumns extends AbstractProcessor {
-
- public static final PropertyDescriptor SQL_SELECT = new PropertyDescriptor.Builder()
- .name("SQL select statement")
- .description("SQL select statement specifies how CSV data should be transformed. "
- + "Sql select should select from CSV.A table")
- .required(true)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .build();
-
- public static final Relationship REL_SUCCESS = new Relationship.Builder()
- .name("success")
- .description("The FlowFile with transformed content will be routed to this relationship")
- .build();
- public static final Relationship REL_FAILURE = new Relationship.Builder()
- .name("failure")
- .description("If a FlowFile fails processing for any reason (for example, the SQL statement contains columns not present in CSV), it will be routed to this relationship")
- .build();
-
- private List<PropertyDescriptor> properties;
- private Set<Relationship> relationships;
-
- @Override
- protected void init(final ProcessorInitializationContext context) {
- final List<PropertyDescriptor> properties = new ArrayList<>();
- properties.add(SQL_SELECT);
- this.properties = Collections.unmodifiableList(properties);
-
- final Set<Relationship> relationships = new HashSet<>();
- relationships.add(REL_SUCCESS);
- relationships.add(REL_FAILURE);
- this.relationships = Collections.unmodifiableSet(relationships);
- }
-
- @Override
- public Set<Relationship> getRelationships() {
- return relationships;
- }
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return properties;
- }
-
- @Override
- public void onTrigger(final ProcessContext context, final ProcessSession session) {
- final FlowFile original = session.get();
- if (original == null) {
- return;
- }
-
- final ProcessorLog logger = getLogger();
- final StopWatch stopWatch = new StopWatch(true);
-
- try {
- FlowFile transformed = session.write(original, new StreamCallback() {
- @Override
- public void process(final InputStream rawIn, final OutputStream out) throws IOException {
- try (final InputStream in = new BufferedInputStream(rawIn)) {
-
- String sql = context.getProperty(SQL_SELECT).getValue();
- final ResultSet resultSet = transform(rawIn, sql);
- convertToCSV(resultSet, out);
-
- } catch (final Exception e) {
- throw new IOException(e);
- }
- }
- });
- session.transfer(transformed, REL_SUCCESS);
- session.getProvenanceReporter().modifyContent(transformed, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
- logger.info("Transformed {}", new Object[]{original});
- } catch (ProcessException e) {
- logger.error("Unable to transform {} due to {}", new Object[]{original, e});
- session.transfer(original, REL_FAILURE);
- }
- }
-
- static protected ResultSet transform(InputStream rawIn, String sql) throws SQLException {
-
- Reader readerx = new InputStreamReader(rawIn);
- HashMap<String, Reader> inputs = new HashMap<>();
- inputs.put("A", readerx);
-
- Statement statement = null;
- final Properties properties = new Properties();
-// properties.setProperty("caseSensitive", "true");
- try (final Connection connection = DriverManager.getConnection("jdbc:calcite:", properties)) {
- final CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
-
- final SchemaPlus rootSchema = calciteConnection.getRootSchema();
- final Schema schema =
- new CsvSchemaFactory2(inputs)
- .create(rootSchema, "CSV", ImmutableMap.<String, Object>of("flavor", "TRANSLATABLE"));
-
- calciteConnection.getRootSchema().add("CSV", schema);
- rootSchema.add("default", schema);
-
- statement = connection.createStatement();
- final ResultSet resultSet = statement.executeQuery(sql);
- return resultSet;
- }
- }
-
- static protected void convertToCSV(ResultSet resultSet, OutputStream out) throws SQLException, IOException {
-
- convertToCsvStream(resultSet, out);
- }
-
- public static long convertToCsvStream(final ResultSet rs, final OutputStream outStream) throws SQLException, IOException {
- return convertToCsvStream(rs, outStream, null, null);
- }
-
- public static long convertToCsvStream(final ResultSet rs, final OutputStream outStream, String recordName, ResultSetRowCallback callback)
- throws SQLException, IOException {
-
- final ResultSetMetaData meta = rs.getMetaData();
- final int nrOfColumns = meta.getColumnCount();
- List<String> columnNames = new ArrayList<>(nrOfColumns);
-
- for (int i = 1; i <= nrOfColumns; i++) {
- String columnNameFromMeta = meta.getColumnName(i);
- // Hive returns table.column for column name. Grab the column name as the string after the last period
- int columnNameDelimiter = columnNameFromMeta.lastIndexOf(".");
- columnNames.add(columnNameFromMeta.substring(columnNameDelimiter + 1));
- }
-
- // Write column names as header row
- outStream.write(StringUtils.join(columnNames, ",").getBytes(StandardCharsets.UTF_8));
- outStream.write("\n".getBytes(StandardCharsets.UTF_8));
-
- // Iterate over the rows
- long nrOfRows = 0;
- while (rs.next()) {
- if (callback != null) {
- callback.processRow(rs);
- }
- List<String> rowValues = new ArrayList<>(nrOfColumns);
- for (int i = 1; i <= nrOfColumns; i++) {
- final int javaSqlType = meta.getColumnType(i);
- final Object value = rs.getObject(i);
-
- switch (javaSqlType) {
- case CHAR:
- case LONGNVARCHAR:
- case LONGVARCHAR:
- case NCHAR:
- case NVARCHAR:
- case VARCHAR:
- rowValues.add("\"" + StringEscapeUtils.escapeCsv(rs.getString(i)) + "\"");
- break;
- default:
- rowValues.add(value.toString());
- }
- }
- // Write row values
- outStream.write(StringUtils.join(rowValues, ",").getBytes(StandardCharsets.UTF_8));
- outStream.write("\n".getBytes(StandardCharsets.UTF_8));
- nrOfRows++;
- }
- return nrOfRows;
- }
-
- /**
- * An interface for callback methods which allows processing of a row during the convertToXYZStream() processing.
- * <b>IMPORTANT:</b> This method should only work on the row pointed at by the current ResultSet reference.
- * Advancing the cursor (e.g.) can cause rows to be skipped during Avro transformation.
- */
- public interface ResultSetRowCallback {
- void processRow(ResultSet resultSet) throws IOException;
- }
-}