You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2017/08/11 16:05:44 UTC

nifi git commit: NIFI-4275 Adding support for specifying the timestamp on PutHBase processors

Repository: nifi
Updated Branches:
  refs/heads/master 760fd75be -> f7da7e67f


NIFI-4275 Adding support for specifying the timestamp on PutHBase processors

Signed-off-by: Pierre Villard <pi...@gmail.com>

This closes #2070.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/f7da7e67
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/f7da7e67
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/f7da7e67

Branch: refs/heads/master
Commit: f7da7e67f49fe9a22eb5357410f0bffd9f589276
Parents: 760fd75
Author: Bryan Bende <bb...@apache.org>
Authored: Wed Aug 9 14:01:19 2017 -0400
Committer: Pierre Villard <pi...@gmail.com>
Committed: Fri Aug 11 18:04:13 2017 +0200

----------------------------------------------------------------------
 .../org/apache/nifi/hbase/AbstractPutHBase.java |  7 ++
 .../org/apache/nifi/hbase/PutHBaseCell.java     | 18 +++-
 .../org/apache/nifi/hbase/PutHBaseJSON.java     | 21 ++++-
 .../org/apache/nifi/hbase/PutHBaseRecord.java   | 46 +++++++++--
 .../org/apache/nifi/hbase/HBaseTestUtil.java    |  8 +-
 .../org/apache/nifi/hbase/TestPutHBaseCell.java | 87 +++++++++++++++++---
 .../org/apache/nifi/hbase/TestPutHBaseJSON.java | 59 +++++++++++++
 .../org/apache/nifi/hbase/put/PutColumn.java    | 10 +++
 .../nifi/hbase/HBase_1_1_2_ClientService.java   | 16 +++-
 9 files changed, 248 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/f7da7e67/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java
index 3aa9054..237bc03 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/AbstractPutHBase.java
@@ -98,6 +98,13 @@ public abstract class AbstractPutHBase extends AbstractProcessor {
             .expressionLanguageSupported(true)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
+    protected static final PropertyDescriptor TIMESTAMP = new PropertyDescriptor.Builder()
+            .name("timestamp")
+            .displayName("Timestamp")
+            .description("The timestamp for the cells being created in HBase. This field can be left blank and HBase will use the current time.")
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+            .build();
     protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
             .name("Batch Size")
             .description("The maximum number of FlowFiles to process in a single execution. The FlowFiles will be " +

http://git-wip-us.apache.org/repos/asf/nifi/blob/f7da7e67/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java
index eb1f636..122e38d 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseCell.java
@@ -30,6 +30,7 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StringUtils;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -57,6 +58,7 @@ public class PutHBaseCell extends AbstractPutHBase {
         properties.add(ROW_ID_ENCODING_STRATEGY);
         properties.add(COLUMN_FAMILY);
         properties.add(COLUMN_QUALIFIER);
+        properties.add(TIMESTAMP);
         properties.add(BATCH_SIZE);
         return properties;
     }
@@ -75,6 +77,20 @@ public class PutHBaseCell extends AbstractPutHBase {
         final String row = context.getProperty(ROW_ID).evaluateAttributeExpressions(flowFile).getValue();
         final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue();
         final String columnQualifier = context.getProperty(COLUMN_QUALIFIER).evaluateAttributeExpressions(flowFile).getValue();
+        final String timestampValue = context.getProperty(TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue();
+
+        final Long timestamp;
+        if (!StringUtils.isBlank(timestampValue)) {
+            try {
+                timestamp = Long.valueOf(timestampValue);
+            } catch (Exception e) {
+                getLogger().error("Invalid timestamp value: " + timestampValue, e);
+                return null;
+            }
+        } else {
+            timestamp = null;
+        }
+
 
         final byte[] buffer = new byte[(int) flowFile.getSize()];
         session.read(flowFile, new InputStreamCallback() {
@@ -86,7 +102,7 @@ public class PutHBaseCell extends AbstractPutHBase {
 
 
         final Collection<PutColumn> columns = Collections.singletonList(new PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8),
-                                                                            columnQualifier.getBytes(StandardCharsets.UTF_8), buffer));
+                                                                            columnQualifier.getBytes(StandardCharsets.UTF_8), buffer, timestamp));
         byte[] rowKeyBytes = getRow(row,context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue());
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/f7da7e67/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java
index 1294d9b..2cb5a13 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseJSON.java
@@ -117,6 +117,7 @@ public class PutHBaseJSON extends AbstractPutHBase {
         properties.add(ROW_FIELD_NAME);
         properties.add(ROW_ID_ENCODING_STRATEGY);
         properties.add(COLUMN_FAMILY);
+        properties.add(TIMESTAMP);
         properties.add(BATCH_SIZE);
         properties.add(COMPLEX_FIELD_STRATEGY);
         properties.add(FIELD_ENCODING_STRATEGY);
@@ -161,11 +162,24 @@ public class PutHBaseJSON extends AbstractPutHBase {
         final String rowId = context.getProperty(ROW_ID).evaluateAttributeExpressions(flowFile).getValue();
         final String rowFieldName = context.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue();
         final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue();
+        final String timestampValue = context.getProperty(TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue();
         final boolean extractRowId = !StringUtils.isBlank(rowFieldName);
         final String complexFieldStrategy = context.getProperty(COMPLEX_FIELD_STRATEGY).getValue();
         final String fieldEncodingStrategy = context.getProperty(FIELD_ENCODING_STRATEGY).getValue();
         final String rowIdEncodingStrategy = context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue();
 
+        final Long timestamp;
+        if (!StringUtils.isBlank(timestampValue)) {
+            try {
+                timestamp = Long.valueOf(timestampValue);
+            } catch (Exception e) {
+                getLogger().error("Invalid timestamp value: " + timestampValue, e);
+                return null;
+            }
+        } else {
+            timestamp = null;
+        }
+
         // Parse the JSON document
         final ObjectMapper mapper = new ObjectMapper();
         final AtomicReference<JsonNode> rootNodeRef = new AtomicReference<>(null);
@@ -238,7 +252,10 @@ public class PutHBaseJSON extends AbstractPutHBase {
                 if (extractRowId && fieldName.equals(rowFieldName)) {
                     rowIdHolder.set(fieldNode.asText());
                 } else {
-                    columns.add(new PutColumn(columnFamily.getBytes(StandardCharsets.UTF_8), fieldName.getBytes(StandardCharsets.UTF_8), fieldValueHolder.get()));
+                    final byte[] colFamBytes = columnFamily.getBytes(StandardCharsets.UTF_8);
+                    final byte[] colQualBytes = fieldName.getBytes(StandardCharsets.UTF_8);
+                    final byte[] colValBytes = fieldValueHolder.get();
+                    columns.add(new PutColumn(colFamBytes, colQualBytes, colValBytes, timestamp));
                 }
             }
         }
@@ -253,7 +270,7 @@ public class PutHBaseJSON extends AbstractPutHBase {
 
         final String putRowId = (extractRowId ? rowIdHolder.get() : rowId);
 
-        byte[] rowKeyBytes = getRow(putRowId,context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue());
+        byte[] rowKeyBytes = getRow(putRowId, rowIdEncodingStrategy);
         return new PutFlowFile(tableName, rowKeyBytes, columns, flowFile);
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/f7da7e67/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java
index 8aa84ea..90b2fdb 100755
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/main/java/org/apache/nifi/hbase/PutHBaseRecord.java
@@ -38,6 +38,8 @@ import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
+import org.apache.nifi.util.StringUtils;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -58,6 +60,17 @@ public class PutHBaseRecord extends AbstractPutHBase {
     protected static final PropertyDescriptor ROW_FIELD_NAME = new PropertyDescriptor.Builder()
             .name("Row Identifier Field Name")
             .description("Specifies the name of a record field whose value should be used as the row id for the given record.")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    protected static final PropertyDescriptor TIMESTAMP_FIELD_NAME = new PropertyDescriptor.Builder()
+            .name("timestamp-field-name")
+            .displayName("Timestamp Field Name")
+            .description("Specifies the name of a record field whose value should be used as the timestamp for the cells in HBase. " +
+                    "The value of this field must be a number, string, or date that can be converted to a long. " +
+                    "If this field is left blank, HBase will use the current time.")
             .expressionLanguageSupported(true)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
@@ -123,6 +136,7 @@ public class PutHBaseRecord extends AbstractPutHBase {
         properties.add(ROW_FIELD_NAME);
         properties.add(ROW_ID_ENCODING_STRATEGY);
         properties.add(COLUMN_FAMILY);
+        properties.add(TIMESTAMP_FIELD_NAME);
         properties.add(BATCH_SIZE);
         properties.add(COMPLEX_FIELD_STRATEGY);
         properties.add(FIELD_ENCODING_STRATEGY);
@@ -161,6 +175,7 @@ public class PutHBaseRecord extends AbstractPutHBase {
         final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
         final String rowFieldName = context.getProperty(ROW_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue();
         final String columnFamily = context.getProperty(COLUMN_FAMILY).evaluateAttributeExpressions(flowFile).getValue();
+        final String timestampFieldName = context.getProperty(TIMESTAMP_FIELD_NAME).evaluateAttributeExpressions(flowFile).getValue();
         final String fieldEncodingStrategy = context.getProperty(FIELD_ENCODING_STRATEGY).getValue();
         final String complexFieldStrategy = context.getProperty(COMPLEX_FIELD_STRATEGY).getValue();
         final String rowEncodingStrategy = context.getProperty(ROW_ID_ENCODING_STRATEGY).getValue();
@@ -184,7 +199,8 @@ public class PutHBaseRecord extends AbstractPutHBase {
             }
 
             while ((record = reader.nextRecord()) != null) {
-                PutFlowFile putFlowFile = createPut(context, record, reader.getSchema(), flowFile, rowFieldName, columnFamily, fieldEncodingStrategy, rowEncodingStrategy, complexFieldStrategy);
+                PutFlowFile putFlowFile = createPut(context, record, reader.getSchema(), flowFile, rowFieldName, columnFamily,
+                        timestampFieldName, fieldEncodingStrategy, rowEncodingStrategy, complexFieldStrategy);
                 flowFiles.add(putFlowFile);
                 index++;
 
@@ -307,8 +323,9 @@ public class PutHBaseRecord extends AbstractPutHBase {
         }
     }
 
-    protected PutFlowFile createPut(ProcessContext context, Record record, RecordSchema schema, FlowFile flowFile,
-                                    String rowFieldName, String columnFamily, String fieldEncodingStrategy, String rowEncodingStrategy, String complexFieldStrategy)
+    protected PutFlowFile createPut(ProcessContext context, Record record, RecordSchema schema, FlowFile flowFile, String rowFieldName,
+                                    String columnFamily, String timestampFieldName, String fieldEncodingStrategy, String rowEncodingStrategy,
+                                    String complexFieldStrategy)
             throws PutCreationFailedInvokedException {
         PutFlowFile retVal = null;
         final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
@@ -318,17 +335,33 @@ public class PutHBaseRecord extends AbstractPutHBase {
         final byte[] fam  = clientService.toBytes(columnFamily);
 
         if (record != null) {
+            final Long timestamp;
+            if (!StringUtils.isBlank(timestampFieldName)) {
+                try {
+                    timestamp = record.getAsLong(timestampFieldName);
+                } catch (IllegalTypeConversionException e) {
+                    throw new PutCreationFailedInvokedException("Could not convert " + timestampFieldName + " to a long", e);
+                }
+
+                if (timestamp == null) {
+                    getLogger().warn("The value of timestamp field " + timestampFieldName + " was null, record will be inserted with latest timestamp");
+                }
+            } else {
+                timestamp = null;
+            }
+
             List<PutColumn> columns = new ArrayList<>();
             for (String name : schema.getFieldNames()) {
-                if (name.equals(rowFieldName)) {
+                if (name.equals(rowFieldName) || name.equals(timestampFieldName)) {
                     continue;
                 }
 
                 final byte[] fieldValueBytes = asBytes(name, schema.getField(name).get().getDataType().getFieldType(), record, asString, complexFieldStrategy);
                 if (fieldValueBytes != null) {
-                    columns.add(new PutColumn(fam, clientService.toBytes(name), fieldValueBytes));
+                    columns.add(new PutColumn(fam, clientService.toBytes(name), fieldValueBytes, timestamp));
                 }
             }
+
             String rowIdValue = record.getAsString(rowFieldName);
             if (rowIdValue == null) {
                 throw new PutCreationFailedInvokedException(String.format("Row ID was null for flowfile with ID %s", flowFile.getAttribute("uuid")));
@@ -345,5 +378,8 @@ public class PutHBaseRecord extends AbstractPutHBase {
         PutCreationFailedInvokedException(String msg) {
             super(msg);
         }
+        PutCreationFailedInvokedException(String msg, Exception e) {
+            super(msg, e);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/f7da7e67/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java
index f86e611..fa598d0 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/HBaseTestUtil.java
@@ -34,6 +34,10 @@ import org.apache.nifi.util.TestRunner;
 public class HBaseTestUtil {
 
     public static void verifyPut(final String row, final String columnFamily, final Map<String,byte[]> columns, final List<PutFlowFile> puts) {
+        verifyPut(row, columnFamily, null, columns, puts);
+    }
+
+    public static void verifyPut(final String row, final String columnFamily, final Long timestamp, final Map<String,byte[]> columns, final List<PutFlowFile> puts) {
         boolean foundPut = false;
 
         for (final PutFlowFile put : puts) {
@@ -54,7 +58,9 @@ public class HBaseTestUtil {
                 for (PutColumn putColumn : put.getColumns()) {
                     if (columnFamily.equals(new String(putColumn.getColumnFamily(), StandardCharsets.UTF_8))
                             && entry.getKey().equals(new String(putColumn.getColumnQualifier(), StandardCharsets.UTF_8))
-                            && Arrays.equals(entry.getValue(), putColumn.getBuffer())) {
+                            && Arrays.equals(entry.getValue(), putColumn.getBuffer())
+                            && ((timestamp == null && putColumn.getTimestamp() == null)
+                                    || (timestamp != null && timestamp.equals(putColumn.getTimestamp())) )) {
                         foundColumn = true;
                         break;
                     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/f7da7e67/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseCell.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseCell.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseCell.java
index ee6a53f..f3fb434 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseCell.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseCell.java
@@ -36,7 +36,7 @@ import static org.junit.Assert.assertNotNull;
 public class TestPutHBaseCell {
 
     @Test
-    public void testSingleFlowFile() throws IOException, InitializationException {
+    public void testSingleFlowFileNoTimestamp() throws IOException, InitializationException {
         final String tableName = "nifi";
         final String row = "row1";
         final String columnFamily = "family1";
@@ -64,26 +64,89 @@ public class TestPutHBaseCell {
 
         List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(tableName);
         assertEquals(1, puts.size());
-        verifyPut(row, columnFamily, columnQualifier, content, puts.get(0));
+        verifyPut(row, columnFamily, columnQualifier, null, content, puts.get(0));
 
         assertEquals(1, runner.getProvenanceEvents().size());
     }
 
     @Test
+    public void testSingleFlowFileWithTimestamp() throws IOException, InitializationException {
+        final String tableName = "nifi";
+        final String row = "row1";
+        final String columnFamily = "family1";
+        final String columnQualifier = "qualifier1";
+        final Long timestamp = 1L;
+
+        final TestRunner runner = TestRunners.newTestRunner(PutHBaseCell.class);
+        runner.setProperty(PutHBaseCell.TABLE_NAME, tableName);
+        runner.setProperty(PutHBaseCell.ROW_ID, row);
+        runner.setProperty(PutHBaseCell.COLUMN_FAMILY, columnFamily);
+        runner.setProperty(PutHBaseCell.COLUMN_QUALIFIER, columnQualifier);
+        runner.setProperty(PutHBaseCell.TIMESTAMP, timestamp.toString());
+        runner.setProperty(PutHBaseCell.BATCH_SIZE, "1");
+
+        final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
+
+        final String content = "some content";
+        runner.enqueue(content.getBytes("UTF-8"));
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
+
+        final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
+        outFile.assertContentEquals(content);
+
+        assertNotNull(hBaseClient.getFlowFilePuts());
+        assertEquals(1, hBaseClient.getFlowFilePuts().size());
+
+        List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(tableName);
+        assertEquals(1, puts.size());
+        verifyPut(row, columnFamily, columnQualifier, timestamp, content, puts.get(0));
+
+        assertEquals(1, runner.getProvenanceEvents().size());
+    }
+
+    @Test
+    public void testSingleFlowFileWithInvalidTimestamp() throws IOException, InitializationException {
+        final String tableName = "nifi";
+        final String row = "row1";
+        final String columnFamily = "family1";
+        final String columnQualifier = "qualifier1";
+        final String timestamp = "not-a-timestamp";
+
+        final PutHBaseCell proc = new PutHBaseCell();
+        final TestRunner runner = getTestRunnerWithEL(proc);
+        runner.setProperty(PutHBaseCell.TIMESTAMP, "${hbase.timestamp}");
+        runner.setProperty(PutHBaseCell.BATCH_SIZE, "1");
+
+        final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
+
+        final String content = "some content";
+        final Map<String, String> attributes = getAttributeMapWithEL(tableName, row, columnFamily, columnQualifier);
+        attributes.put("hbase.timestamp", timestamp);
+        runner.enqueue(content.getBytes("UTF-8"), attributes);
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 1);
+    }
+
+    @Test
     public void testSingleFlowFileWithEL() throws IOException, InitializationException {
         final String tableName = "nifi";
         final String row = "row1";
         final String columnFamily = "family1";
         final String columnQualifier = "qualifier1";
+        final Long timestamp = 1L;
 
         final PutHBaseCell proc = new PutHBaseCell();
         final TestRunner runner = getTestRunnerWithEL(proc);
+        runner.setProperty(PutHBaseCell.TIMESTAMP, "${hbase.timestamp}");
         runner.setProperty(PutHBaseCell.BATCH_SIZE, "1");
 
         final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
 
         final String content = "some content";
         final Map<String, String> attributes = getAttributeMapWithEL(tableName, row, columnFamily, columnQualifier);
+        attributes.put("hbase.timestamp", timestamp.toString());
         runner.enqueue(content.getBytes("UTF-8"), attributes);
 
         runner.run();
@@ -97,7 +160,7 @@ public class TestPutHBaseCell {
 
         List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(tableName);
         assertEquals(1, puts.size());
-        verifyPut(row, columnFamily, columnQualifier, content, puts.get(0));
+        verifyPut(row, columnFamily, columnQualifier, timestamp, content, puts.get(0));
 
         assertEquals(1, runner.getProvenanceEvents().size());
     }
@@ -185,8 +248,8 @@ public class TestPutHBaseCell {
 
         List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(tableName);
         assertEquals(2, puts.size());
-        verifyPut(row1, columnFamily, columnQualifier, content1, puts.get(0));
-        verifyPut(row2, columnFamily, columnQualifier, content2, puts.get(1));
+        verifyPut(row1, columnFamily, columnQualifier, null, content1, puts.get(0));
+        verifyPut(row2, columnFamily, columnQualifier, null, content2, puts.get(1));
 
         assertEquals(2, runner.getProvenanceEvents().size());
     }
@@ -247,8 +310,8 @@ public class TestPutHBaseCell {
 
         List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(tableName);
         assertEquals(2, puts.size());
-        verifyPut(row, columnFamily, columnQualifier, content1, puts.get(0));
-        verifyPut(row, columnFamily, columnQualifier, content2, puts.get(1));
+        verifyPut(row, columnFamily, columnQualifier, null, content1, puts.get(0));
+        verifyPut(row, columnFamily, columnQualifier, null, content2, puts.get(1));
 
         assertEquals(2, runner.getProvenanceEvents().size());
     }
@@ -295,10 +358,11 @@ public class TestPutHBaseCell {
 
         List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(tableName);
         assertEquals(1, puts.size());
-        verifyPut(expectedRowKey, columnFamily.getBytes(StandardCharsets.UTF_8), columnQualifier.getBytes(StandardCharsets.UTF_8), content, puts.get(0));
+        verifyPut(expectedRowKey, columnFamily.getBytes(StandardCharsets.UTF_8), columnQualifier.getBytes(StandardCharsets.UTF_8), null, content, puts.get(0));
 
         assertEquals(1, runner.getProvenanceEvents().size());
     }
+
     private Map<String, String> getAttributeMapWithEL(String tableName, String row, String columnFamily, String columnQualifier) {
         final Map<String,String> attributes1 = new HashMap<>();
         attributes1.put("hbase.tableName", tableName);
@@ -325,11 +389,11 @@ public class TestPutHBaseCell {
         return hBaseClient;
     }
 
-    private void verifyPut(String row, String columnFamily, String columnQualifier, String content, PutFlowFile put) {
+    private void verifyPut(String row, String columnFamily, String columnQualifier, Long timestamp, String content, PutFlowFile put) {
         verifyPut(row.getBytes(StandardCharsets.UTF_8),columnFamily.getBytes(StandardCharsets.UTF_8),
-                                columnQualifier.getBytes(StandardCharsets.UTF_8),content,put);
+                                columnQualifier.getBytes(StandardCharsets.UTF_8), timestamp, content, put);
     }
-    private void verifyPut(byte[] row, byte[] columnFamily, byte[] columnQualifier, String content, PutFlowFile put) {
+    private void verifyPut(byte[] row, byte[] columnFamily, byte[] columnQualifier, Long timestamp, String content, PutFlowFile put) {
         assertEquals(new String(row, StandardCharsets.UTF_8), new String(put.getRow(), StandardCharsets.UTF_8));
 
         assertNotNull(put.getColumns());
@@ -339,6 +403,7 @@ public class TestPutHBaseCell {
         assertEquals(new String(columnFamily, StandardCharsets.UTF_8), new String(column.getColumnFamily(), StandardCharsets.UTF_8));
         assertEquals(new String(columnQualifier, StandardCharsets.UTF_8), new String(column.getColumnQualifier(), StandardCharsets.UTF_8));
         assertEquals(content, new String(column.getBuffer(), StandardCharsets.UTF_8));
+        assertEquals(timestamp, column.getTimestamp());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/f7da7e67/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java
index d20d354..ee799e4 100644
--- a/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java
+++ b/nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/src/test/java/org/apache/nifi/hbase/TestPutHBaseJSON.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.apache.nifi.hbase.HBaseTestUtil.getHBaseClientService;
 
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.List;
@@ -41,6 +42,7 @@ public class TestPutHBaseJSON {
     public static final String DEFAULT_TABLE_NAME = "nifi";
     public static final String DEFAULT_ROW = "row1";
     public static final String DEFAULT_COLUMN_FAMILY = "family1";
+    public static final Long DEFAULT_TIMESTAMP = 1L;
 
     @Test
     public void testCustomValidate() throws InitializationException {
@@ -441,6 +443,63 @@ public class TestPutHBaseJSON {
         runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_FAILURE, 1);
     }
 
+    @Test
+    public void testTimestamp() throws UnsupportedEncodingException, InitializationException {
+        final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
+        final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
+        runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
+        runner.setProperty(PutHBaseJSON.TIMESTAMP, DEFAULT_TIMESTAMP.toString());
+
+        final String content = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }";
+        runner.enqueue(content.getBytes("UTF-8"));
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
+
+        final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
+        outFile.assertContentEquals(content);
+
+        assertNotNull(hBaseClient.getFlowFilePuts());
+        assertEquals(1, hBaseClient.getFlowFilePuts().size());
+
+        final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
+        assertEquals(1, puts.size());
+
+        final Map<String,byte[]> expectedColumns = new HashMap<>();
+        expectedColumns.put("field1", hBaseClient.toBytes("value1"));
+        expectedColumns.put("field2", hBaseClient.toBytes("value2"));
+        HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, DEFAULT_TIMESTAMP, expectedColumns, puts);
+    }
+
+    @Test
+    public void testTimestampWithEL() throws UnsupportedEncodingException, InitializationException {
+        final TestRunner runner = getTestRunner(DEFAULT_TABLE_NAME, DEFAULT_COLUMN_FAMILY, "1");
+        final MockHBaseClientService hBaseClient = getHBaseClientService(runner);
+        runner.setProperty(PutHBaseJSON.ROW_ID, DEFAULT_ROW);
+        runner.setProperty(PutHBaseJSON.TIMESTAMP, "${hbase.timestamp}");
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put("hbase.timestamp", DEFAULT_TIMESTAMP.toString());
+
+        final String content = "{ \"field1\" : \"value1\", \"field2\" : \"value2\" }";
+        runner.enqueue(content.getBytes("UTF-8"), attributes);
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PutHBaseCell.REL_SUCCESS);
+
+        final MockFlowFile outFile = runner.getFlowFilesForRelationship(PutHBaseCell.REL_SUCCESS).get(0);
+        outFile.assertContentEquals(content);
+
+        assertNotNull(hBaseClient.getFlowFilePuts());
+        assertEquals(1, hBaseClient.getFlowFilePuts().size());
+
+        final List<PutFlowFile> puts = hBaseClient.getFlowFilePuts().get(DEFAULT_TABLE_NAME);
+        assertEquals(1, puts.size());
+
+        final Map<String,byte[]> expectedColumns = new HashMap<>();
+        expectedColumns.put("field1", hBaseClient.toBytes("value1"));
+        expectedColumns.put("field2", hBaseClient.toBytes("value2"));
+        HBaseTestUtil.verifyPut(DEFAULT_ROW, DEFAULT_COLUMN_FAMILY, DEFAULT_TIMESTAMP, expectedColumns, puts);
+    }
+
     private TestRunner getTestRunner(String table, String columnFamily, String batchSize) {
         final TestRunner runner = TestRunners.newTestRunner(PutHBaseJSON.class);
         runner.setProperty(PutHBaseJSON.TABLE_NAME, table);

http://git-wip-us.apache.org/repos/asf/nifi/blob/f7da7e67/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java
index 7921bc2..b29e032 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase-client-service-api/src/main/java/org/apache/nifi/hbase/put/PutColumn.java
@@ -24,12 +24,18 @@ public class PutColumn {
     private final byte[] columnFamily;
     private final byte[] columnQualifier;
     private final byte[] buffer;
+    private final Long timestamp;
 
 
     public PutColumn(final byte[] columnFamily, final byte[] columnQualifier, final byte[] buffer) {
+        this(columnFamily, columnQualifier, buffer, null);
+    }
+
+    public PutColumn(final byte[] columnFamily, final byte[] columnQualifier, final byte[] buffer, final Long timestamp) {
         this.columnFamily = columnFamily;
         this.columnQualifier = columnQualifier;
         this.buffer = buffer;
+        this.timestamp = timestamp;
     }
 
     public byte[] getColumnFamily() {
@@ -44,4 +50,8 @@ public class PutColumn {
         return buffer;
     }
 
+    public Long getTimestamp() {
+        return timestamp;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/f7da7e67/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
index f6ac852..53f5834 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hbase_1_1_2-client-service-bundle/nifi-hbase_1_1_2-client-service/src/main/java/org/apache/nifi/hbase/HBase_1_1_2_ClientService.java
@@ -284,10 +284,18 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
                 }
 
                 for (final PutColumn column : putFlowFile.getColumns()) {
-                    put.addColumn(
-                            column.getColumnFamily(),
-                            column.getColumnQualifier(),
-                            column.getBuffer());
+                    if (column.getTimestamp() != null) {
+                        put.addColumn(
+                                column.getColumnFamily(),
+                                column.getColumnQualifier(),
+                                column.getTimestamp(),
+                                column.getBuffer());
+                    } else {
+                        put.addColumn(
+                                column.getColumnFamily(),
+                                column.getColumnQualifier(),
+                                column.getBuffer());
+                    }
                 }
             }