You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mt...@apache.org on 2018/09/18 12:11:13 UTC

nifi git commit: NIFI-5510: Introducing PutCassandraRecord processor NIFI-5510: Fixes for PR review comments

Repository: nifi
Updated Branches:
  refs/heads/master 7e627f2fe -> c56a7e9ba


NIFI-5510: Introducing PutCassandraRecord processor
NIFI-5510: Fixes for PR review comments

This closes #2992

Signed-off-by: Mike Thomsen <mi...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c56a7e9b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c56a7e9b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c56a7e9b

Branch: refs/heads/master
Commit: c56a7e9ba5808ead129d462c369e73c710dd3145
Parents: 7e627f2
Author: zenfenan <si...@gmail.com>
Authored: Fri Aug 17 15:26:47 2018 +0530
Committer: Mike Thomsen <mi...@gmail.com>
Committed: Tue Sep 18 08:10:14 2018 -0400

----------------------------------------------------------------------
 .../nifi-cassandra-processors/pom.xml           |  15 ++
 .../cassandra/AbstractCassandraProcessor.java   |  18 +-
 .../processors/cassandra/PutCassandraQL.java    |  14 --
 .../cassandra/PutCassandraRecord.java           | 222 +++++++++++++++++++
 .../processors/cassandra/QueryCassandra.java    |  14 --
 .../org.apache.nifi.processor.Processor         |  33 +--
 .../cassandra/PutCassandraRecordIT.java         | 123 ++++++++++
 .../cassandra/PutCassandraRecordTest.java       | 198 +++++++++++++++++
 8 files changed, 591 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c56a7e9b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/pom.xml b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/pom.xml
index 0338d21..d5c36d4 100644
--- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/pom.xml
@@ -64,5 +64,20 @@
             <artifactId>nifi-ssl-context-service</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+            <version>1.8.0-SNAPSHOT</version>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/c56a7e9b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
index b0f82f9..d1e3874 100644
--- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
@@ -37,6 +37,7 @@ import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.security.util.SslContextFactory;
 import org.apache.nifi.ssl.SSLContextService;
@@ -72,8 +73,8 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
 
     public static final PropertyDescriptor KEYSPACE = new PropertyDescriptor.Builder()
             .name("Keyspace")
-            .description("The Cassandra Keyspace to connect to. If no keyspace is specified, the query will need to "
-                    + "include the keyspace name before any table reference.")
+            .description("The Cassandra Keyspace to connect to. If not set, the keyspace name has to be provided with the " +
+                    "table name in the form of <KEYSPACE>.<TABLE>")
             .required(false)
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@@ -131,6 +132,19 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
             .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
             .build();
 
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("A FlowFile is transferred to this relationship if the operation completed successfully.")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("A FlowFile is transferred to this relationship if the operation failed.")
+            .build();
+    public static final Relationship REL_RETRY = new Relationship.Builder().name("retry")
+            .description("A FlowFile is transferred to this relationship if the operation cannot be completed but attempting "
+                    + "it again may succeed.")
+            .build();
+
     static List<PropertyDescriptor> descriptors = new ArrayList<>();
 
     static {

http://git-wip-us.apache.org/repos/asf/nifi/blob/c56a7e9b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java
index d6f13de..1b14874 100644
--- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java
@@ -119,20 +119,6 @@ public class PutCassandraQL extends AbstractCassandraProcessor {
 
     private final static List<PropertyDescriptor> propertyDescriptors;
 
-    // Relationships
-    public static final Relationship REL_SUCCESS = new Relationship.Builder()
-            .name("success")
-            .description("Successfully executed CQL statement.")
-            .build();
-    public static final Relationship REL_FAILURE = new Relationship.Builder()
-            .name("failure")
-            .description("CQL statement execution failed.")
-            .build();
-    public static final Relationship REL_RETRY = new Relationship.Builder().name("retry")
-            .description("A FlowFile is transferred to this relationship if the statement cannot be executed successfully but "
-                    + "attempting the operation again may succeed.")
-            .build();
-
     private final static Set<Relationship> relationships;
 
     private static final Pattern CQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("cql\\.args\\.(\\d+)\\.type");

http://git-wip-us.apache.org/repos/asf/nifi/blob/c56a7e9b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
new file mode 100644
index 0000000..402ec3d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraRecord.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.cassandra;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.AuthenticationException;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.core.querybuilder.Insert;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import org.apache.nifi.util.StopWatch;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Tags({"cassandra", "cql", "put", "insert", "update", "set", "record"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("This is a record aware processor that reads the content of the incoming FlowFile as individual records using the " +
+        "configured 'Record Reader' and writes them to Apache Cassandra using native protocol version 3 or higher.")
+public class PutCassandraRecord extends AbstractCassandraProcessor {
+
+    static final PropertyDescriptor RECORD_READER_FACTORY = new PropertyDescriptor.Builder()
+            .name("put-cassandra-record-reader")
+            .displayName("Record Reader")
+            .description("Specifies the type of Record Reader controller service to use for parsing the incoming data " +
+                    "and determining the schema")
+            .identifiesControllerService(RecordReaderFactory.class)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor TABLE = new PropertyDescriptor.Builder()
+            .name("put-cassandra-record-table")
+            .displayName("Table name")
+            .description("The name of the Cassandra table to which the records have to be written.")
+            .required(true)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+            .name("put-cassandra-record-batch-size")
+            .displayName("Batch size")
+            .description("Specifies the number of 'Insert statements' to be grouped together to execute as a batch (BatchStatement)")
+            .defaultValue("100")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor BATCH_STATEMENT_TYPE = new PropertyDescriptor.Builder()
+            .name("put-cassandra-record-batch-statement-type")
+            .displayName("Batch Statement Type")
+            .description("Specifies the type of 'Batch Statement' to be used.")
+            .allowableValues(BatchStatement.Type.values())
+            .defaultValue(BatchStatement.Type.LOGGED.toString())
+            .required(false)
+            .build();
+
+    static final PropertyDescriptor CONSISTENCY_LEVEL = new PropertyDescriptor.Builder()
+            .fromPropertyDescriptor(AbstractCassandraProcessor.CONSISTENCY_LEVEL)
+            .allowableValues(ConsistencyLevel.SERIAL.name(), ConsistencyLevel.LOCAL_SERIAL.name())
+            .defaultValue(ConsistencyLevel.SERIAL.name())
+            .build();
+
+    private final static List<PropertyDescriptor> propertyDescriptors = Collections.unmodifiableList(Arrays.asList(
+            CONTACT_POINTS, KEYSPACE, TABLE, CLIENT_AUTH, USERNAME, PASSWORD, RECORD_READER_FACTORY,
+            BATCH_SIZE, CONSISTENCY_LEVEL, BATCH_STATEMENT_TYPE, PROP_SSL_CONTEXT_SERVICE));
+
+    private final static Set<Relationship> relationships = Collections.unmodifiableSet(
+            new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return propertyDescriptors;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        try {
+            connectToCassandra(context);
+        } catch (NoHostAvailableException nhae) {
+            getLogger().error("No host in the Cassandra cluster can be contacted successfully to execute this statement", nhae);
+            getLogger().error(nhae.getCustomMessage(10, true, false));
+            throw new ProcessException(nhae);
+        } catch (AuthenticationException ae) {
+            getLogger().error("Invalid username/password combination", ae);
+            throw new ProcessException(ae);
+        }
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        FlowFile inputFlowFile = session.get();
+
+        if (inputFlowFile == null) {
+            return;
+        }
+
+        final String cassandraTable = context.getProperty(TABLE).evaluateAttributeExpressions(inputFlowFile).getValue();
+        final RecordReaderFactory recordParserFactory = context.getProperty(RECORD_READER_FACTORY).asControllerService(RecordReaderFactory.class);
+        final int batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
+        final String batchStatementType = context.getProperty(BATCH_STATEMENT_TYPE).getValue();
+        final String serialConsistencyLevel = context.getProperty(CONSISTENCY_LEVEL).getValue();
+
+        final BatchStatement batchStatement;
+        final Session connectionSession = cassandraSession.get();
+        final AtomicInteger recordsAdded = new AtomicInteger(0);
+        final StopWatch stopWatch = new StopWatch(true);
+
+        boolean error = false;
+
+        try (final InputStream inputStream = session.read(inputFlowFile);
+             final RecordReader reader = recordParserFactory.createRecordReader(inputFlowFile, inputStream, getLogger())){
+
+            final RecordSchema schema = reader.getSchema();
+            Record record;
+
+            batchStatement = new BatchStatement(BatchStatement.Type.valueOf(batchStatementType));
+            batchStatement.setSerialConsistencyLevel(ConsistencyLevel.valueOf(serialConsistencyLevel));
+
+            while((record = reader.nextRecord()) != null) {
+                Map<String, Object> recordContentMap = (Map<String, Object>) DataTypeUtils
+                        .convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+                Insert insertQuery;
+
+                if (cassandraTable.contains(".")) {
+                    String keyspaceAndTable[] = cassandraTable.split("\\.");
+                    insertQuery = QueryBuilder.insertInto(keyspaceAndTable[0], keyspaceAndTable[1]);
+                } else {
+                    insertQuery = QueryBuilder.insertInto(cassandraTable);
+                }
+                for (String fieldName : schema.getFieldNames()) {
+                    insertQuery.value(fieldName, recordContentMap.get(fieldName));
+                }
+                batchStatement.add(insertQuery);
+
+                if (recordsAdded.incrementAndGet() == batchSize) {
+                    connectionSession.execute(batchStatement);
+                    batchStatement.clear();
+                }
+            }
+
+            if (batchStatement.size() != 0) {
+                connectionSession.execute(batchStatement);
+                batchStatement.clear();
+            }
+
+        } catch (Exception e) {
+            error = true;
+            getLogger().error("Unable to write the records into Cassandra table due to {}", new Object[] {e});
+            session.transfer(inputFlowFile, REL_FAILURE);
+        } finally {
+            if (!error) {
+                stopWatch.stop();
+                long duration = stopWatch.getDuration(TimeUnit.MILLISECONDS);
+                String transitUri = "cassandra://" + connectionSession.getCluster().getMetadata().getClusterName() + "." + cassandraTable;
+
+                session.getProvenanceReporter().send(inputFlowFile, transitUri, "Inserted " + recordsAdded.get() + " records", duration);
+                session.transfer(inputFlowFile, REL_SUCCESS);
+            }
+        }
+
+    }
+
+    @OnUnscheduled
+    public void stop() {
+        super.stop();
+    }
+
+    @OnShutdown
+    public void shutdown() {
+        super.stop();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c56a7e9b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
index 384c395..40b88cc 100644
--- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
@@ -132,20 +132,6 @@ public class QueryCassandra extends AbstractCassandraProcessor {
 
     private final static List<PropertyDescriptor> propertyDescriptors;
 
-    // Relationships
-    public static final Relationship REL_SUCCESS = new Relationship.Builder()
-            .name("success")
-            .description("Successfully created FlowFile from CQL query result set.")
-            .build();
-    public static final Relationship REL_FAILURE = new Relationship.Builder()
-            .name("failure")
-            .description("CQL query execution failed.")
-            .build();
-    public static final Relationship REL_RETRY = new Relationship.Builder().name("retry")
-            .description("A FlowFile is transferred to this relationship if the query cannot be completed but attempting "
-                    + "the operation again may succeed.")
-            .build();
-
     private final static Set<Relationship> relationships;
 
     /*

http://git-wip-us.apache.org/repos/asf/nifi/blob/c56a7e9b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 34cf4ef..88aa0d8 100644
--- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -1,16 +1,17 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-org.apache.nifi.processors.cassandra.QueryCassandra
-org.apache.nifi.processors.cassandra.PutCassandraQL
\ No newline at end of file
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.processors.cassandra.QueryCassandra
+org.apache.nifi.processors.cassandra.PutCassandraQL
+org.apache.nifi.processors.cassandra.PutCassandraRecord
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/c56a7e9b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordIT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordIT.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordIT.java
new file mode 100644
index 0000000..07b9ac6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordIT.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.querybuilder.Select;
+import com.datastax.driver.core.querybuilder.Truncate;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class PutCassandraRecordIT {
+
+    private static TestRunner testRunner;
+    private static MockRecordParser recordReader;
+
+    private static Cluster cluster;
+    private static Session session;
+
+    private static final String KEYSPACE = "sample_keyspace";
+    private static final String TABLE = "sample_table";
+    private static final String HOST = "localhost";
+    private static final int PORT = 9042;
+
+    @BeforeClass
+    public static void setup() throws InitializationException {
+        recordReader = new MockRecordParser();
+        testRunner = TestRunners.newTestRunner(PutCassandraRecord.class);
+
+        testRunner.setProperty(PutCassandraRecord.RECORD_READER_FACTORY, "reader");
+        testRunner.setProperty(PutCassandraRecord.CONTACT_POINTS, HOST + ":" + PORT);
+        testRunner.setProperty(PutCassandraRecord.KEYSPACE, KEYSPACE);
+        testRunner.setProperty(PutCassandraRecord.TABLE, TABLE);
+        testRunner.setProperty(PutCassandraRecord.CONSISTENCY_LEVEL, "SERIAL");
+        testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, "LOGGED");
+        testRunner.addControllerService("reader", recordReader);
+        testRunner.enableControllerService(recordReader);
+
+        cluster = Cluster.builder().addContactPoint(HOST).withPort(PORT).build();
+        session = cluster.connect();
+
+        String createKeyspace = "CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE + " WITH replication = {'class':'SimpleStrategy','replication_factor':1};";
+        String createTable = "CREATE TABLE IF NOT EXISTS " + KEYSPACE + "." + TABLE + "(id int PRIMARY KEY, name text, age int);";
+
+        session.execute(createKeyspace);
+        session.execute(createTable);
+    }
+
+    @Test
+    public void testSimplePut() {
+        recordReader.addSchemaField("id", RecordFieldType.INT);
+        recordReader.addSchemaField("name", RecordFieldType.STRING);
+        recordReader.addSchemaField("age", RecordFieldType.INT);
+
+        recordReader.addRecord(1, "Ram", 42);
+        recordReader.addRecord(2, "Jeane", 47);
+        recordReader.addRecord(3, "Ilamaran", 27);
+        recordReader.addRecord(4, "Jian", 14);
+        recordReader.addRecord(5, "Sakura", 24);
+
+        testRunner.enqueue("");
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(PutCassandraRecord.REL_SUCCESS, 1);
+        Assert.assertEquals(5, getRecordsCount());
+    }
+
+    private int getRecordsCount() {
+        Select selectQuery = QueryBuilder.select().all().from(KEYSPACE, TABLE);
+        ResultSet result = session.execute(selectQuery);
+
+        List<Integer> resultsList = result.all()
+                .stream()
+                .map(r -> r.getInt(0))
+                .collect(Collectors.toList());
+
+        dropRecords();
+        return resultsList.size();
+    }
+
+    private void dropRecords() {
+        Truncate query = QueryBuilder.truncate(KEYSPACE, TABLE);
+        session.execute(query);
+    }
+
+    @AfterClass
+    public static void shutdown() {
+        String dropKeyspace = "DROP KEYSPACE " + KEYSPACE;
+        String dropTable = "DROP TABLE IF EXISTS " + KEYSPACE + "." + TABLE;
+
+        session.execute(dropTable);
+        session.execute(dropKeyspace);
+
+        session.close();
+        cluster.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/c56a7e9b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordTest.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordTest.java
new file mode 100644
index 0000000..88a9b5d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraRecordTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.cassandra;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Configuration;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.net.ssl.SSLContext;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class PutCassandraRecordTest {
+
+    private TestRunner testRunner;
+    private MockRecordParser recordReader;
+
+    @Before
+    public void setUp() throws Exception {
+        MockPutCassandraRecord processor = new MockPutCassandraRecord();
+        recordReader = new MockRecordParser();
+        testRunner = TestRunners.newTestRunner(processor);
+        testRunner.setProperty(PutCassandraRecord.RECORD_READER_FACTORY, "reader");
+    }
+
+    @Test
+    public void testProcessorConfigValidity() throws InitializationException {
+        testRunner.setProperty(PutCassandraRecord.CONTACT_POINTS, "localhost:9042");
+        testRunner.assertNotValid();
+
+        testRunner.setProperty(PutCassandraRecord.PASSWORD, "password");
+        testRunner.assertNotValid();
+
+        testRunner.setProperty(PutCassandraRecord.USERNAME, "username");
+        testRunner.assertNotValid();
+
+        testRunner.setProperty(PutCassandraRecord.CONSISTENCY_LEVEL, "SERIAL");
+        testRunner.assertNotValid();
+
+        testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, "LOGGED");
+        testRunner.assertNotValid();
+
+        testRunner.setProperty(PutCassandraRecord.KEYSPACE, "sampleks");
+        testRunner.assertNotValid();
+
+        testRunner.setProperty(PutCassandraRecord.TABLE, "sampletbl");
+        testRunner.assertNotValid();
+
+        testRunner.addControllerService("reader", recordReader);
+        testRunner.enableControllerService(recordReader);
+        testRunner.assertValid();
+    }
+
+    private void setUpStandardTestConfig() throws InitializationException {
+        testRunner.setProperty(AbstractCassandraProcessor.CONTACT_POINTS, "localhost:9042");
+        testRunner.setProperty(AbstractCassandraProcessor.PASSWORD, "password");
+        testRunner.setProperty(AbstractCassandraProcessor.USERNAME, "username");
+        testRunner.setProperty(PutCassandraRecord.CONSISTENCY_LEVEL, "SERIAL");
+        testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, "LOGGED");
+        testRunner.setProperty(PutCassandraRecord.TABLE, "sampleks.sampletbl");
+        testRunner.addControllerService("reader", recordReader);
+        testRunner.enableControllerService(recordReader);
+    }
+
+    @Test
+    public void testSimplePut() throws InitializationException {
+        setUpStandardTestConfig();
+
+        recordReader.addSchemaField("name", RecordFieldType.STRING);
+        recordReader.addSchemaField("age", RecordFieldType.INT);
+        recordReader.addSchemaField("sport", RecordFieldType.STRING);
+
+        recordReader.addRecord("John Doe", 48, "Soccer");
+        recordReader.addRecord("Jane Doe", 47, "Tennis");
+        recordReader.addRecord("Sally Doe", 47, "Curling");
+        recordReader.addRecord("Jimmy Doe", 14, null);
+        recordReader.addRecord("Pizza Doe", 14, null);
+
+        testRunner.enqueue("");
+        testRunner.run();
+
+        testRunner.assertAllFlowFilesTransferred(PutCassandraRecord.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testEL() throws InitializationException {
+        testRunner.setProperty(PutCassandraRecord.CONTACT_POINTS, "${contact.points}");
+        testRunner.setProperty(PutCassandraRecord.PASSWORD, "${pass}");
+        testRunner.setProperty(PutCassandraRecord.USERNAME, "${user}");
+        testRunner.setProperty(PutCassandraRecord.CONSISTENCY_LEVEL, "SERIAL");
+        testRunner.setProperty(PutCassandraRecord.BATCH_STATEMENT_TYPE, "LOGGED");
+        testRunner.setProperty(PutCassandraRecord.TABLE, "sampleks.sampletbl");
+        testRunner.addControllerService("reader", recordReader);
+        testRunner.enableControllerService(recordReader);
+
+        testRunner.assertValid();
+
+        testRunner.setVariable("contact.points", "localhost:9042");
+        testRunner.setVariable("user", "username");
+        testRunner.setVariable("pass", "password");
+
+        recordReader.addSchemaField("name", RecordFieldType.STRING);
+        recordReader.addSchemaField("age", RecordFieldType.INT);
+        recordReader.addSchemaField("sport", RecordFieldType.STRING);
+
+        recordReader.addRecord("John Doe", 48, "Soccer");
+        recordReader.addRecord("Jane Doe", 47, "Tennis");
+        recordReader.addRecord("Sally Doe", 47, "Curling");
+        recordReader.addRecord("Jimmy Doe", 14, null);
+        recordReader.addRecord("Pizza Doe", 14, null);
+
+        testRunner.enqueue("");
+        testRunner.run(1, true, true);
+        testRunner.assertAllFlowFilesTransferred(PutCassandraRecord.REL_SUCCESS, 1);
+    }
+
+    private static class MockPutCassandraRecord extends PutCassandraRecord {
+        private Exception exceptionToThrow = null;
+        private Session mockSession = mock(Session.class);
+
+        @Override
+        protected Cluster createCluster(List<InetSocketAddress> contactPoints, SSLContext sslContext,
+                                        String username, String password) {
+            Cluster mockCluster = mock(Cluster.class);
+            try {
+                Metadata mockMetadata = mock(Metadata.class);
+                when(mockMetadata.getClusterName()).thenReturn("cluster1");
+                when(mockCluster.getMetadata()).thenReturn(mockMetadata);
+                when(mockCluster.connect()).thenReturn(mockSession);
+                when(mockCluster.connect(anyString())).thenReturn(mockSession);
+                Configuration config = Configuration.builder().build();
+                when(mockCluster.getConfiguration()).thenReturn(config);
+                ResultSetFuture future = mock(ResultSetFuture.class);
+                ResultSet rs = CassandraQueryTestUtil.createMockResultSet();
+                PreparedStatement ps = mock(PreparedStatement.class);
+                when(mockSession.prepare(anyString())).thenReturn(ps);
+                BoundStatement bs = mock(BoundStatement.class);
+                when(ps.bind()).thenReturn(bs);
+                when(future.getUninterruptibly()).thenReturn(rs);
+                try {
+                    doReturn(rs).when(future).getUninterruptibly(anyLong(), any(TimeUnit.class));
+                } catch (TimeoutException te) {
+                    throw new IllegalArgumentException("Mocked cluster doesn't time out");
+                }
+                if (exceptionToThrow != null) {
+                    doThrow(exceptionToThrow).when(mockSession).executeAsync(anyString());
+                    doThrow(exceptionToThrow).when(mockSession).executeAsync(any(Statement.class));
+
+                } else {
+                    when(mockSession.executeAsync(anyString())).thenReturn(future);
+                    when(mockSession.executeAsync(any(Statement.class))).thenReturn(future);
+                }
+                when(mockSession.getCluster()).thenReturn(mockCluster);
+            } catch (Exception e) {
+                fail(e.getMessage());
+            }
+            return mockCluster;
+        }
+    }
+}