You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by sz...@apache.org on 2018/06/08 08:12:04 UTC

[2/2] flume git commit: FLUME-3142: Adding HBase2 sink

FLUME-3142: Adding HBase2 sink

HBase2Sink is the equivalent of HBaseSink for HBase version 2.
HBaseSink used some API calls which were deprecated in HBase 1.x
and they are not available in HBase 2.x any more.

HBase2Sink has been implemented by copying the existing
flume-ng-hbase-sink module to the new flume-ng-hbase2-sink module,
then adjusting the incompatible API calls to HBase 2.
The package and class names have also been modified to have
the hbase2/HBase2 tag. "Hbase" typos have been fixed too.

The functionality provided by HBase2Sink and the configuration parameters
are the same as in case of HBaseSink (except the hbase2 tag in the sink type
and the package/class names).

HBaseSink has not been modified, so it works with HBase 1.x as before.

This closes #209

Reviewers: Denes Arvay, Endre Major, Ferenc Szabo

(Peter Turcsanyi via Ferenc Szabo)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/719afe90
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/719afe90
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/719afe90

Branch: refs/heads/trunk
Commit: 719afe908d39a87cb046dcd974eebaa7886a00e8
Parents: 3ada170
Author: Ferenc Szabo <sz...@apache.org>
Authored: Fri Jun 8 10:10:26 2018 +0200
Committer: Ferenc Szabo <sz...@apache.org>
Committed: Fri Jun 8 10:10:26 2018 +0200

----------------------------------------------------------------------
 .../flume/conf/sink/SinkConfiguration.java      |   7 +
 .../org/apache/flume/conf/sink/SinkType.java    |   7 +
 flume-ng-dist/pom.xml                           |   4 +
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  42 +
 flume-ng-sinks/flume-ng-hbase2-sink/pom.xml     | 214 +++++
 .../apache/flume/sink/hbase2/BatchAware.java    |  28 +
 .../sink/hbase2/HBase2EventSerializer.java      |  61 ++
 .../apache/flume/sink/hbase2/HBase2Sink.java    | 548 +++++++++++++
 .../HBase2SinkConfigurationConstants.java       |  77 ++
 .../sink/hbase2/RegexHBase2EventSerializer.java | 214 +++++
 .../hbase2/SimpleHBase2EventSerializer.java     | 151 ++++
 .../sink/hbase2/SimpleRowKeyGenerator.java      |  46 ++
 .../sink/hbase2/IncrementHBase2Serializer.java  |  80 ++
 .../hbase2/MockSimpleHBase2EventSerializer.java |  38 +
 .../flume/sink/hbase2/TestHBase2Sink.java       | 780 +++++++++++++++++++
 .../sink/hbase2/TestHBase2SinkCreation.java     |  48 ++
 .../hbase2/TestRegexHBase2EventSerializer.java  | 238 ++++++
 flume-ng-sinks/pom.xml                          |   1 +
 pom.xml                                         |   8 +
 19 files changed, 2592 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/719afe90/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkConfiguration.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkConfiguration.java
index 4538792..09430d8 100644
--- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkConfiguration.java
+++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkConfiguration.java
@@ -136,6 +136,13 @@ public class SinkConfiguration extends ComponentConfiguration {
     ASYNCHBASE("org.apache.flume.sink.hbase.HBaseSinkConfiguration"),
 
     /**
+     * HBase2 sink
+     *
+     * @see org.apache.flume.sink.hbase2.HBase2Sink
+     */
+    HBASE2("org.apache.flume.sink.hbase2.HBase2SinkConfiguration"),
+
+    /**
      * MorphlineSolr sink
      *
      * @see org.apache.flume.sink.solr.morphline.MorphlineSolrSink

http://git-wip-us.apache.org/repos/asf/flume/blob/719afe90/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkType.java
----------------------------------------------------------------------
diff --git a/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkType.java b/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkType.java
index c3f8cac..645dba0 100644
--- a/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkType.java
+++ b/flume-ng-configuration/src/main/java/org/apache/flume/conf/sink/SinkType.java
@@ -97,6 +97,13 @@ public enum SinkType implements ComponentWithClassName {
   ASYNCHBASE("org.apache.flume.sink.hbase.AsyncHBaseSink"),
 
   /**
+   * HBase2 sink
+   *
+   * @see org.apache.flume.sink.hbase2.HBase2Sink
+   */
+  HBASE2("org.apache.flume.sink.hbase2.HBase2Sink"),
+
+  /**
    * MorphlineSolr sink
    *
    * @see org.apache.flume.sink.solr.morphline.MorphlineSolrSink

http://git-wip-us.apache.org/repos/asf/flume/blob/719afe90/flume-ng-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-dist/pom.xml b/flume-ng-dist/pom.xml
index 1140e1d..e963bda 100644
--- a/flume-ng-dist/pom.xml
+++ b/flume-ng-dist/pom.xml
@@ -154,6 +154,10 @@
     </dependency>
     <dependency>
       <groupId>org.apache.flume.flume-ng-sinks</groupId>
+      <artifactId>flume-ng-hbase2-sink</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.flume.flume-ng-sinks</groupId>
       <artifactId>flume-http-sink</artifactId>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/flume/blob/719afe90/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 4e70bcc..af91f9e 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -2531,6 +2531,46 @@ Example for agent named a1:
   a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
   a1.sinks.k1.channel = c1
 
+HBase2Sink
+''''''''''
+
+HBase2Sink is the equivalent of HBaseSink for HBase version 2.
+The provided functionality and the configuration parameters are the same as in case of HBaseSink (except the hbase2 tag in the sink type and the package/class names).
+
+The type is the FQCN: org.apache.flume.sink.hbase2.HBase2Sink.
+
+Required properties are in **bold**.
+
+==================  ========================================================  ==============================================================================
+Property Name       Default                                                   Description
+==================  ========================================================  ==============================================================================
+**channel**         --
+**type**            --                                                        The component type name, needs to be ``hbase2``
+**table**           --                                                        The name of the table in HBase to write to.
+**columnFamily**    --                                                        The column family in HBase to write to.
+zookeeperQuorum     --                                                        The quorum spec. This is the value for the property ``hbase.zookeeper.quorum`` in hbase-site.xml
+znodeParent         /hbase                                                    The base path for the znode for the -ROOT- region. Value of ``zookeeper.znode.parent`` in hbase-site.xml
+batchSize           100                                                       Number of events to be written per txn.
+coalesceIncrements  false                                                     Should the sink coalesce multiple increments to a cell per batch. This might give
+                                                                              better performance if there are multiple increments to a limited number of cells.
+serializer          org.apache.flume.sink.hbase2.SimpleHBase2EventSerializer  Default increment column = "iCol", payload column = "pCol".
+serializer.*        --                                                        Properties to be passed to the serializer.
+kerberosPrincipal   --                                                        Kerberos user principal for accessing secure HBase
+kerberosKeytab      --                                                        Kerberos keytab for accessing secure HBase
+==================  ========================================================  ==============================================================================
+
+Example for agent named a1:
+
+.. code-block:: properties
+
+  a1.channels = c1
+  a1.sinks = k1
+  a1.sinks.k1.type = hbase2
+  a1.sinks.k1.table = foo_table
+  a1.sinks.k1.columnFamily = bar_cf
+  a1.sinks.k1.serializer = org.apache.flume.sink.hbase2.RegexHBase2EventSerializer
+  a1.sinks.k1.channel = c1
+
 AsyncHBaseSink
 ''''''''''''''
 
@@ -2541,6 +2581,7 @@ to HBase. This sink uses the `Asynchbase API <https://github.com/OpenTSDB/asynch
 HBase. This sink provides the same consistency guarantees as HBase,
 which is currently row-wise atomicity. In the event of Hbase failing to
 write certain events, the sink will replay all events in that transaction.
+AsyncHBaseSink can only be used with HBase 1.x. The async client library used by AsyncHBaseSink is not available for HBase 2.
 The type is the FQCN: org.apache.flume.sink.hbase.AsyncHBaseSink.
 Required properties are in **bold**.
 
@@ -4989,6 +5030,7 @@ org.apache.flume.Sink                                         logger
 org.apache.flume.Sink                                         avro                    org.apache.flume.sink.AvroSink
 org.apache.flume.Sink                                         hdfs                    org.apache.flume.sink.hdfs.HDFSEventSink
 org.apache.flume.Sink                                         hbase                   org.apache.flume.sink.hbase.HBaseSink
+org.apache.flume.Sink                                         hbase2                  org.apache.flume.sink.hbase2.HBase2Sink
 org.apache.flume.Sink                                         asynchbase              org.apache.flume.sink.hbase.AsyncHBaseSink
 org.apache.flume.Sink                                         elasticsearch           org.apache.flume.sink.elasticsearch.ElasticSearchSink
 org.apache.flume.Sink                                         file_roll               org.apache.flume.sink.RollingFileSink

http://git-wip-us.apache.org/repos/asf/flume/blob/719afe90/flume-ng-sinks/flume-ng-hbase2-sink/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase2-sink/pom.xml b/flume-ng-sinks/flume-ng-hbase2-sink/pom.xml
new file mode 100644
index 0000000..7419a88
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-hbase2-sink/pom.xml
@@ -0,0 +1,214 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
+  license agreements. See the NOTICE file distributed with this work for additional
+  information regarding copyright ownership. The ASF licenses this file to
+  You under the Apache License, Version 2.0 (the "License"); you may not use
+  this file except in compliance with the License. You may obtain a copy of
+  the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
+  by applicable law or agreed to in writing, software distributed under the
+  License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
+  OF ANY KIND, either express or implied. See the License for the specific
+  language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <artifactId>flume-ng-sinks</artifactId>
+    <groupId>org.apache.flume</groupId>
+    <version>1.9.0-SNAPSHOT</version>
+  </parent>
+  <groupId>org.apache.flume.flume-ng-sinks</groupId>
+  <artifactId>flume-ng-hbase2-sink</artifactId>
+  <name>Flume NG HBase2 Sink</name>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencyManagement>
+    <dependencies>
+      <!-- HBase2 dependencies -->
+      <dependency>
+        <groupId>org.apache.hbase</groupId>
+        <artifactId>hbase-common</artifactId>
+        <version>${hbase2.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.hbase</groupId>
+        <artifactId>hbase-common</artifactId>
+        <version>${hbase2.version}</version>
+        <type>test-jar</type>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.hbase</groupId>
+        <artifactId>hbase-client</artifactId>
+        <version>${hbase2.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.hbase</groupId>
+        <artifactId>hbase-client</artifactId>
+        <version>${hbase2.version}</version>
+        <type>test-jar</type>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.hbase</groupId>
+        <artifactId>hbase-server</artifactId>
+        <version>${hbase2.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.hbase</groupId>
+        <artifactId>hbase-server</artifactId>
+        <version>${hbase2.version}</version>
+        <type>test-jar</type>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.hbase</groupId>
+        <artifactId>hbase-testing-util</artifactId>
+        <version>${hbase2.version}</version>
+      </dependency>
+
+      <!-- Jetty version needed by HBase2 tests -->
+      <dependency>
+        <groupId>org.eclipse.jetty</groupId>
+        <artifactId>jetty-servlet</artifactId>
+        <version>${hbase2.jetty.version}</version>
+        <scope>test</scope>
+      </dependency>
+
+      <dependency>
+        <groupId>org.eclipse.jetty</groupId>
+        <artifactId>jetty-util</artifactId>
+        <version>${hbase2.jetty.version}</version>
+        <scope>test</scope>
+      </dependency>
+
+      <dependency>
+        <groupId>org.eclipse.jetty</groupId>
+        <artifactId>jetty-server</artifactId>
+        <version>${hbase2.jetty.version}</version>
+        <scope>test</scope>
+      </dependency>
+    </dependencies>
+  </dependencyManagement>
+  <dependencies>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-sdk</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flume</groupId>
+      <artifactId>flume-ng-configuration</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>${hadoop.common.artifact.id}</artifactId>
+      <optional>true</optional>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-client</artifactId>
+      <optional>true</optional>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-api</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-slf4j-impl</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.logging.log4j</groupId>
+      <artifactId>log4j-1.2-api</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minicluster</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-testing-util</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/flume/blob/719afe90/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/BatchAware.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/BatchAware.java b/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/BatchAware.java
new file mode 100644
index 0000000..f184fec
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/BatchAware.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.hbase2;
+
+/**
+ * This interface allows for implementing HBase serializers that are aware of
+ * batching. {@link #onBatchStart()} is called at the beginning of each batch
+ * by the sink.
+ */
+public interface BatchAware {
+  void onBatchStart();
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/719afe90/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2EventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2EventSerializer.java b/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2EventSerializer.java
new file mode 100644
index 0000000..3fb64a4
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2EventSerializer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.hbase2;
+
+import java.util.List;
+
+import org.apache.flume.Event;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.ConfigurableComponent;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Row;
+
+/**
+ * Interface for an event serializer which serializes the headers and body
+ * of an event to write them to HBase 2. This is configurable, so any config
+ * params required should be taken through this. Only the column family is
+ * passed in. The columns should exist in the table and column family
+ * specified in the configuration for the HBase2Sink.
+ */
+public interface HBase2EventSerializer extends Configurable, ConfigurableComponent {
+  /**
+   * Initialize the event serializer.
+   * @param event Event to be written to HBase
+   * @param columnFamily Column family to write to
+   */
+  void initialize(Event event, byte[] columnFamily);
+
+  /**
+   * Get the actions that should be written out to hbase as a result of this
+   * event. This list is written to HBase using the HBase batch API.
+   * @return List of {@link org.apache.hadoop.hbase.client.Row} which
+   * are written as such to HBase.
+   *
+   * 0.92 increments do not implement Row, so this is not generic.
+   *
+   */
+  List<Row> getActions();
+
+  List<Increment> getIncrements();
+
+  /*
+   * Clean up any state. This will be called when the sink is being stopped.
+   */
+  void close();
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/719afe90/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2Sink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2Sink.java b/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2Sink.java
new file mode 100644
index 0000000..a62d27e
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2Sink.java
@@ -0,0 +1,548 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.hbase2;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.EventDeliveryException;
+import org.apache.flume.FlumeException;
+import org.apache.flume.Transaction;
+import org.apache.flume.annotations.InterfaceAudience;
+import org.apache.flume.auth.FlumeAuthenticationUtil;
+import org.apache.flume.auth.PrivilegedExecutor;
+import org.apache.flume.conf.Configurable;
+import org.apache.flume.conf.ConfigurationException;
+import org.apache.flume.instrumentation.SinkCounter;
+import org.apache.flume.sink.AbstractSink;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.VersionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+
+/**
+ * A simple sink which reads events from a channel and writes them to HBase 2.
+ * The HBase configuration is picked up from the first <tt>hbase-site.xml</tt>
+ * encountered in the classpath. This sink supports batch reading of
+ * events from the channel, and writing them to HBase, to minimize the number
+ * of flushes on the HBase tables. To use this sink, it has to be configured
+ * with certain mandatory parameters:<p>
+ * <tt>table: </tt> The name of the table in HBase to write to. <p>
+ * <tt>columnFamily: </tt> The column family in HBase to write to.<p>
+ * This sink will commit each transaction if the table's write buffer size is
+ * reached or if the number of events in the current transaction reaches the
+ * batch size, whichever comes first.<p>
+ * Other optional parameters are:<p>
+ * <tt>serializer:</tt> A class implementing {@link HBase2EventSerializer}.
+ * An instance of
+ * this class will be used to write out events to HBase.<p>
+ * <tt>serializer.*:</tt> Passed in the configure() method to serializer
+ * as an object of {@link org.apache.flume.Context}.<p>
+ * <tt>batchSize: </tt>This is the batch size used by the client. This is the
+ * maximum number of events the sink will commit per transaction. The default
+ * batch size is 100 events.
+ * <p>
+ * <p>
+ * <strong>Note: </strong> While this sink flushes all events in a transaction
+ * to HBase in one shot, HBase does not guarantee atomic commits on multiple
+ * rows. So if a subset of events in a batch are written to disk by HBase and
+ * HBase fails, the flume transaction is rolled back, causing flume to write
+ * all the events in the transaction all over again, which will cause
+ * duplicates. The serializer is expected to take care of the handling of
+ * duplicates etc. HBase also does not support batch increments, so if
+ * multiple increments are returned by the serializer, then HBase failure
+ * will cause them to be re-written, when HBase comes back up.
+ */
+public class HBase2Sink extends AbstractSink implements Configurable {
+  private String tableName;
+  private byte[] columnFamily;
+  private Connection conn;
+  private BufferedMutator table;
+  private long batchSize;
+  private final Configuration config;
+  private static final Logger logger = LoggerFactory.getLogger(HBase2Sink.class);
+  private HBase2EventSerializer serializer;
+  private String kerberosPrincipal;
+  private String kerberosKeytab;
+  private boolean enableWal = true;
+  private boolean batchIncrements = false;
+  private SinkCounter sinkCounter;
+  private PrivilegedExecutor privilegedExecutor;
+
+  // Internal hooks used for unit testing.
+  private DebugIncrementsCallback debugIncrCallback = null;
+
+  public HBase2Sink() {
+    this(HBaseConfiguration.create());
+  }
+
+  public HBase2Sink(Configuration conf) {
+    this.config = conf;
+  }
+
+  @VisibleForTesting
+  @InterfaceAudience.Private
+  HBase2Sink(Configuration conf, DebugIncrementsCallback cb) {
+    this(conf);
+    this.debugIncrCallback = cb;
+  }
+
+  @Override
+  public void start() {
+    Preconditions.checkArgument(table == null, "Please call stop " +
+        "before calling start on an old instance.");
+    try {
+      privilegedExecutor =
+          FlumeAuthenticationUtil.getAuthenticator(kerberosPrincipal, kerberosKeytab);
+    } catch (Exception ex) {
+      sinkCounter.incrementConnectionFailedCount();
+      throw new FlumeException("Failed to login to HBase using "
+          + "provided credentials.", ex);
+    }
+    try {
+      conn = privilegedExecutor.execute((PrivilegedExceptionAction<Connection>) () -> {
+        conn = ConnectionFactory.createConnection(config);
+        return conn;
+      });
+      // Flush is controlled by us. This ensures that HBase changing
+      // their criteria for flushing does not change how we flush.
+      table = conn.getBufferedMutator(TableName.valueOf(tableName));
+
+    } catch (Exception e) {
+      sinkCounter.incrementConnectionFailedCount();
+      logger.error("Could not load table, " + tableName +
+          " from HBase", e);
+      throw new FlumeException("Could not load table, " + tableName +
+          " from HBase", e);
+    }
+    try {
+      if (!privilegedExecutor.execute((PrivilegedExceptionAction<Boolean>) () -> {
+        Table t = null;
+        try {
+          t = conn.getTable(TableName.valueOf(tableName));
+          return t.getTableDescriptor().hasFamily(columnFamily);
+        } finally {
+          if (t != null) {
+            t.close();
+          }
+        }
+      })) {
+        throw new IOException("Table " + tableName
+            + " has no such column family " + Bytes.toString(columnFamily));
+      }
+    } catch (Exception e) {
+      //Get getTableDescriptor also throws IOException, so catch the IOException
+      //thrown above or by the getTableDescriptor() call.
+      sinkCounter.incrementConnectionFailedCount();
+      throw new FlumeException("Error getting column family from HBase."
+          + "Please verify that the table " + tableName + " and Column Family, "
+          + Bytes.toString(columnFamily) + " exists in HBase, and the"
+          + " current user has permissions to access that table.", e);
+    }
+
+    super.start();
+    sinkCounter.incrementConnectionCreatedCount();
+    sinkCounter.start();
+  }
+
+  @Override
+  public void stop() {
+    try {
+      if (table != null) {
+        table.close();
+      }
+      table = null;
+    } catch (IOException e) {
+      throw new FlumeException("Error closing table.", e);
+    }
+    try {
+      if (conn != null) {
+        conn.close();
+      }
+      conn = null;
+    } catch (IOException e) {
+      throw new FlumeException("Error closing connection.", e);
+    }
+    sinkCounter.incrementConnectionClosedCount();
+    sinkCounter.stop();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void configure(Context context) {
+    if (!this.hasVersionAtLeast2()) {
+      throw new ConfigurationException(
+              "HBase major version number must be at least 2 for hbase2sink");
+    }
+
+    tableName = context.getString(HBase2SinkConfigurationConstants.CONFIG_TABLE);
+    String cf = context.getString(
+        HBase2SinkConfigurationConstants.CONFIG_COLUMN_FAMILY);
+    batchSize = context.getLong(
+        HBase2SinkConfigurationConstants.CONFIG_BATCHSIZE, 100L);
+    Context serializerContext = new Context();
+    //If not specified, will use HBase defaults.
+    String eventSerializerType = context.getString(
+            HBase2SinkConfigurationConstants.CONFIG_SERIALIZER);
+    Preconditions.checkNotNull(tableName,
+        "Table name cannot be empty, please specify in configuration file");
+    Preconditions.checkNotNull(cf,
+        "Column family cannot be empty, please specify in configuration file");
+    //Check foe event serializer, if null set event serializer type
+    if (eventSerializerType == null || eventSerializerType.isEmpty()) {
+      eventSerializerType =
+          "org.apache.flume.sink.hbase2.SimpleHBase2EventSerializer";
+      logger.info("No serializer defined, Will use default");
+    }
+    serializerContext.putAll(context.getSubProperties(
+        HBase2SinkConfigurationConstants.CONFIG_SERIALIZER_PREFIX));
+    columnFamily = cf.getBytes(Charsets.UTF_8);
+    try {
+      Class<? extends HBase2EventSerializer> clazz =
+          (Class<? extends HBase2EventSerializer>)
+              Class.forName(eventSerializerType);
+      serializer = clazz.newInstance();
+      serializer.configure(serializerContext);
+    } catch (Exception e) {
+      logger.error("Could not instantiate event serializer.", e);
+      Throwables.propagate(e);
+    }
+    kerberosKeytab = context.getString(HBase2SinkConfigurationConstants.CONFIG_KEYTAB);
+    kerberosPrincipal = context.getString(HBase2SinkConfigurationConstants.CONFIG_PRINCIPAL);
+
+    enableWal = context.getBoolean(HBase2SinkConfigurationConstants
+        .CONFIG_ENABLE_WAL, HBase2SinkConfigurationConstants.DEFAULT_ENABLE_WAL);
+    logger.info("The write to WAL option is set to: " + String.valueOf(enableWal));
+    if (!enableWal) {
+      logger.warn("HBase Sink's enableWal configuration is set to false. All " +
+          "writes to HBase will have WAL disabled, and any data in the " +
+          "memstore of this region in the Region Server could be lost!");
+    }
+
+    batchIncrements = context.getBoolean(
+        HBase2SinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS,
+        HBase2SinkConfigurationConstants.DEFAULT_COALESCE_INCREMENTS);
+
+    if (batchIncrements) {
+      logger.info("Increment coalescing is enabled. Increments will be " +
+          "buffered.");
+    }
+
+    String zkQuorum = context.getString(HBase2SinkConfigurationConstants
+        .ZK_QUORUM);
+    Integer port = null;
+    /*
+     * HBase allows multiple nodes in the quorum, but all need to use the
+     * same client port. So get the nodes in host:port format,
+     * and ignore the ports for all nodes except the first one. If no port is
+     * specified, use default.
+     */
+    if (zkQuorum != null && !zkQuorum.isEmpty()) {
+      StringBuilder zkBuilder = new StringBuilder();
+      logger.info("Using ZK Quorum: " + zkQuorum);
+      String[] zkHosts = zkQuorum.split(",");
+      int length = zkHosts.length;
+      for (int i = 0; i < length; i++) {
+        String[] zkHostAndPort = zkHosts[i].split(":");
+        zkBuilder.append(zkHostAndPort[0].trim());
+        if (i != length - 1) {
+          zkBuilder.append(",");
+        } else {
+          zkQuorum = zkBuilder.toString();
+        }
+        if (zkHostAndPort[1] == null) {
+          throw new FlumeException("Expected client port for the ZK node!");
+        }
+        if (port == null) {
+          port = Integer.parseInt(zkHostAndPort[1].trim());
+        } else if (!port.equals(Integer.parseInt(zkHostAndPort[1].trim()))) {
+          throw new FlumeException("All Zookeeper nodes in the quorum must " +
+              "use the same client port.");
+        }
+      }
+      if (port == null) {
+        port = HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT;
+      }
+      this.config.set(HConstants.ZOOKEEPER_QUORUM, zkQuorum);
+      this.config.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, port);
+    }
+    String hbaseZnode = context.getString(
+        HBase2SinkConfigurationConstants.ZK_ZNODE_PARENT);
+    if (hbaseZnode != null && !hbaseZnode.isEmpty()) {
+      this.config.set(HConstants.ZOOKEEPER_ZNODE_PARENT, hbaseZnode);
+    }
+    sinkCounter = new SinkCounter(this.getName());
+  }
+
+  public Configuration getConfig() {
+    return config;
+  }
+
+  @Override
+  public Status process() throws EventDeliveryException {
+    Status status = Status.READY;
+    Channel channel = getChannel();
+    Transaction txn = channel.getTransaction();
+    List<Row> actions = new LinkedList<>();
+    List<Increment> incs = new LinkedList<>();
+    try {
+      txn.begin();
+
+      if (serializer instanceof BatchAware) {
+        ((BatchAware) serializer).onBatchStart();
+      }
+
+      long i = 0;
+      for (; i < batchSize; i++) {
+        Event event = channel.take();
+        if (event == null) {
+          if (i == 0) {
+            status = Status.BACKOFF;
+            sinkCounter.incrementBatchEmptyCount();
+          } else {
+            sinkCounter.incrementBatchUnderflowCount();
+          }
+          break;
+        } else {
+          serializer.initialize(event, columnFamily);
+          actions.addAll(serializer.getActions());
+          incs.addAll(serializer.getIncrements());
+        }
+      }
+      if (i == batchSize) {
+        sinkCounter.incrementBatchCompleteCount();
+      }
+      sinkCounter.addToEventDrainAttemptCount(i);
+
+      putEventsAndCommit(actions, incs, txn);
+
+    } catch (Throwable e) {
+      try {
+        txn.rollback();
+      } catch (Exception e2) {
+        logger.error("Exception in rollback. Rollback might not have been " +
+            "successful.", e2);
+      }
+      logger.error("Failed to commit transaction." +
+          "Transaction rolled back.", e);
+      if (e instanceof Error || e instanceof RuntimeException) {
+        logger.error("Failed to commit transaction." +
+            "Transaction rolled back.", e);
+        Throwables.propagate(e);
+      } else {
+        logger.error("Failed to commit transaction." +
+            "Transaction rolled back.", e);
+        throw new EventDeliveryException("Failed to commit transaction." +
+            "Transaction rolled back.", e);
+      }
+    } finally {
+      txn.close();
+    }
+    return status;
+  }
+
+  private void putEventsAndCommit(final List<Row> actions,
+                                  final List<Increment> incs, Transaction txn) throws Exception {
+
+    privilegedExecutor.execute((PrivilegedExceptionAction<Void>) () -> {
+      final List<Mutation> mutations = new ArrayList<>(actions.size());
+      for (Row r : actions) {
+        if (r instanceof Put) {
+          ((Put) r).setDurability(enableWal ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
+        }
+        // Newer versions of HBase - Increment implements Row.
+        if (r instanceof Increment) {
+          ((Increment) r).setDurability(enableWal ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
+        }
+        if (r instanceof Mutation) {
+          mutations.add((Mutation)r);
+        } else {
+          logger.warn("dropping row " + r + " since it is not an Increment or Put");
+        }
+      }
+      table.mutate(mutations);
+      table.flush();
+      return null;
+    });
+
+    privilegedExecutor.execute((PrivilegedExceptionAction<Void>) () -> {
+
+      List<Increment> processedIncrements;
+      if (batchIncrements) {
+        processedIncrements = coalesceIncrements(incs);
+      } else {
+        processedIncrements = incs;
+      }
+
+      // Only used for unit testing.
+      if (debugIncrCallback != null) {
+        debugIncrCallback.onAfterCoalesce(processedIncrements);
+      }
+
+      for (final Increment i : processedIncrements) {
+        i.setDurability(enableWal ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
+        table.mutate(i);
+      }
+      table.flush();
+      return null;
+    });
+
+    txn.commit();
+    sinkCounter.addToEventDrainSuccessCount(actions.size());
+  }
+
+
+  @SuppressWarnings("unchecked")
+  private Map<byte[], NavigableMap<byte[], Long>> getFamilyMap(Increment inc) {
+    Preconditions.checkNotNull(inc, "Increment required");
+    return inc.getFamilyMapOfLongs();
+  }
+
+  /**
+   * Perform "compression" on the given set of increments so that Flume sends
+   * the minimum possible number of RPC operations to HBase per batch.
+   *
+   * @param incs Input: Increment objects to coalesce.
+   * @return List of new Increment objects after coalescing the unique counts.
+   */
+  private List<Increment> coalesceIncrements(Iterable<Increment> incs) {
+    Preconditions.checkNotNull(incs, "List of Increments must not be null");
+    // Aggregate all of the increment row/family/column counts.
+    // The nested map is keyed like this: {row, family, qualifier} => count.
+    Map<byte[], Map<byte[], NavigableMap<byte[], Long>>> counters = 
+        Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
+    for (Increment inc : incs) {
+      byte[] row = inc.getRow();
+      Map<byte[], NavigableMap<byte[], Long>> families = getFamilyMap(inc);
+      for (Map.Entry<byte[], NavigableMap<byte[], Long>> familyEntry : families.entrySet()) {
+        byte[] family = familyEntry.getKey();
+        NavigableMap<byte[], Long> qualifiers = familyEntry.getValue();
+        for (Map.Entry<byte[], Long> qualifierEntry : qualifiers.entrySet()) {
+          byte[] qualifier = qualifierEntry.getKey();
+          Long count = qualifierEntry.getValue();
+          incrementCounter(counters, row, family, qualifier, count);
+        }
+      }
+    }
+
+    // Reconstruct list of Increments per unique row/family/qualifier.
+    List<Increment> coalesced = Lists.newLinkedList();
+    for (Map.Entry<byte[], Map<byte[], NavigableMap<byte[], Long>>> rowEntry :
+         counters.entrySet()) {
+      byte[] row = rowEntry.getKey();
+      Map<byte[], NavigableMap<byte[], Long>> families = rowEntry.getValue();
+      Increment inc = new Increment(row);
+      for (Map.Entry<byte[], NavigableMap<byte[], Long>> familyEntry : families.entrySet()) {
+        byte[] family = familyEntry.getKey();
+        NavigableMap<byte[], Long> qualifiers = familyEntry.getValue();
+        for (Map.Entry<byte[], Long> qualifierEntry : qualifiers.entrySet()) {
+          byte[] qualifier = qualifierEntry.getKey();
+          long count = qualifierEntry.getValue();
+          inc.addColumn(family, qualifier, count);
+        }
+      }
+      coalesced.add(inc);
+    }
+
+    return coalesced;
+  }
+
+  /**
+   * Helper function for {@link #coalesceIncrements} to increment a counter
+   * value in the passed data structure.
+   *
+   * @param counters  Nested data structure containing the counters.
+   * @param row       Row key to increment.
+   * @param family    Column family to increment.
+   * @param qualifier Column qualifier to increment.
+   * @param count     Amount to increment by.
+   */
+  private void incrementCounter(
+      Map<byte[], Map<byte[], NavigableMap<byte[], Long>>> counters,
+      byte[] row, byte[] family, byte[] qualifier, Long count) {
+
+    Map<byte[], NavigableMap<byte[], Long>> families =
+            counters.computeIfAbsent(row, k -> Maps.newTreeMap(Bytes.BYTES_COMPARATOR));
+
+    NavigableMap<byte[], Long> qualifiers =
+            families.computeIfAbsent(family, k -> Maps.newTreeMap(Bytes.BYTES_COMPARATOR));
+
+    qualifiers.merge(qualifier, count, (a, b) -> a + b);
+  }
+
+  String getHBbaseVersionString() {
+    return VersionInfo.getVersion();
+  }
+
+  private int getMajorVersion(String version) throws NumberFormatException {
+    return Integer.parseInt(version.split("\\.")[0]);
+  }
+
+  private boolean hasVersionAtLeast2() {
+    String version = getHBbaseVersionString();
+    try {
+      if (this.getMajorVersion(version) >= 2) {
+        return true;
+      }
+    } catch (NumberFormatException ex) {
+      logger.error(ex.getMessage());
+    }
+    logger.error("Invalid HBase version for hbase2sink:" + version);
+    return false;
+  }
+
+  @VisibleForTesting
+  @InterfaceAudience.Private
+  HBase2EventSerializer getSerializer() {
+    return serializer;
+  }
+
+  @VisibleForTesting
+  @InterfaceAudience.Private
+  interface DebugIncrementsCallback {
+    void onAfterCoalesce(Iterable<Increment> increments);
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/719afe90/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2SinkConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2SinkConfigurationConstants.java b/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2SinkConfigurationConstants.java
new file mode 100644
index 0000000..15aa9e6
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/HBase2SinkConfigurationConstants.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.hbase2;
+
+import org.apache.hadoop.hbase.HConstants;
+
+/**
+ * Constants used for configuration of HBaseSink2
+ *
+ */
+public class HBase2SinkConfigurationConstants {
+  /**
+   * The HBase table which the sink should write to.
+   */
+  public static final String CONFIG_TABLE = "table";
+  /**
+   * The column family which the sink should use.
+   */
+  public static final String CONFIG_COLUMN_FAMILY = "columnFamily";
+  /**
+   * Maximum number of events the sink should take from the channel per
+   * transaction, if available.
+   */
+  public static final String CONFIG_BATCHSIZE = "batchSize";
+  /**
+   * The fully qualified class name of the serializer the sink should use.
+   */
+  public static final String CONFIG_SERIALIZER = "serializer";
+  /**
+   * Configuration to pass to the serializer.
+   */
+  public static final String CONFIG_SERIALIZER_PREFIX = CONFIG_SERIALIZER + ".";
+
+  public static final String CONFIG_TIMEOUT = "timeout";
+
+  public static final String CONFIG_ENABLE_WAL = "enableWal";
+
+  public static final boolean DEFAULT_ENABLE_WAL = true;
+
+  public static final long DEFAULT_TIMEOUT = 60000;
+
+  public static final String CONFIG_KEYTAB = "kerberosKeytab";
+
+  public static final String CONFIG_PRINCIPAL = "kerberosPrincipal";
+
+  public static final String ZK_QUORUM = "zookeeperQuorum";
+
+  public static final String ZK_ZNODE_PARENT = "znodeParent";
+
+  public static final String DEFAULT_ZK_ZNODE_PARENT =
+      HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT;
+
+  public static final String CONFIG_COALESCE_INCREMENTS = "coalesceIncrements";
+
+  public static final Boolean DEFAULT_COALESCE_INCREMENTS = false;
+
+  public static final int DEFAULT_MAX_CONSECUTIVE_FAILS = 10;
+
+  public static final String CONFIG_MAX_CONSECUTIVE_FAILS = "maxConsecutiveFails";
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/719afe90/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/RegexHBase2EventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/RegexHBase2EventSerializer.java b/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/RegexHBase2EventSerializer.java
new file mode 100644
index 0000000..089795e
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/RegexHBase2EventSerializer.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.hbase2;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Row;
+
+import java.nio.charset.Charset;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * An {@link HBase2EventSerializer} which parses columns based on a supplied
+ * regular expression and column name list.
+ * <p>
+ * Note that if the regular expression does not return the correct number of
+ * groups for a particular event, or it does not correctly match an event,
+ * the event is silently dropped.
+ * <p>
+ * Row keys for each event consist of a timestamp concatenated with an
+ * identifier which enforces uniqueness of keys across flume agents.
+ * <p>
+ * See static constant variables for configuration options.
+ */
+public class RegexHBase2EventSerializer implements HBase2EventSerializer {
+  // Config vars
+  /** Regular expression used to parse groups from event data. */
+  public static final String REGEX_CONFIG = "regex";
+  public static final String REGEX_DEFAULT = "(.*)";
+
+  /** Whether to ignore case when performing regex matches. */
+  public static final String IGNORE_CASE_CONFIG = "regexIgnoreCase";
+  public static final boolean IGNORE_CASE_DEFAULT = false;
+
+  /** Comma separated list of column names to place matching groups in. */
+  public static final String COL_NAME_CONFIG = "colNames";
+  public static final String COLUMN_NAME_DEFAULT = "payload";
+
+  /** Index of the row key in matched regex groups */
+  public static final String ROW_KEY_INDEX_CONFIG = "rowKeyIndex";
+
+  /** Placeholder in colNames for row key */
+  public static final String ROW_KEY_NAME = "ROW_KEY";
+
+  /** Whether to deposit event headers into corresponding column qualifiers */
+  public static final String DEPOSIT_HEADERS_CONFIG = "depositHeaders";
+  public static final boolean DEPOSIT_HEADERS_DEFAULT = false;
+
+  /** What charset to use when serializing into HBase's byte arrays */
+  public static final String CHARSET_CONFIG = "charset";
+  public static final String CHARSET_DEFAULT = "UTF-8";
+
+  /* This is a nonce used in HBase row-keys, such that the same row-key
+   * never gets written more than once from within this JVM. */
+  protected static final AtomicInteger nonce = new AtomicInteger(0);
+  protected static final String randomKey = RandomStringUtils.randomAlphanumeric(10);
+
+  protected byte[] cf;
+  private byte[] payload;
+  private final List<byte[]> colNames = Lists.newArrayList();
+  private Map<String, String> headers;
+  private boolean depositHeaders;
+  private Pattern inputPattern;
+  private Charset charset;
+  private int rowKeyIndex;
+
+  @Override
+  public void configure(Context context) {
+    String regex = context.getString(REGEX_CONFIG, REGEX_DEFAULT);
+    boolean regexIgnoreCase = context.getBoolean(IGNORE_CASE_CONFIG,
+            IGNORE_CASE_DEFAULT);
+    depositHeaders = context.getBoolean(DEPOSIT_HEADERS_CONFIG,
+        DEPOSIT_HEADERS_DEFAULT);
+    inputPattern = Pattern.compile(regex, Pattern.DOTALL
+        + (regexIgnoreCase ? Pattern.CASE_INSENSITIVE : 0));
+    charset = Charset.forName(context.getString(CHARSET_CONFIG,
+        CHARSET_DEFAULT));
+
+    String colNameStr = context.getString(COL_NAME_CONFIG, COLUMN_NAME_DEFAULT);
+    String[] columnNames = colNameStr.split(",");
+    for (String s : columnNames) {
+      colNames.add(s.getBytes(charset));
+    }
+
+    //Rowkey is optional, default is -1
+    rowKeyIndex = context.getInteger(ROW_KEY_INDEX_CONFIG, -1);
+    //if row key is being used, make sure it is specified correct
+    if (rowKeyIndex >= 0) {
+      if (rowKeyIndex >= columnNames.length) {
+        throw new IllegalArgumentException(ROW_KEY_INDEX_CONFIG + " must be " +
+            "less than num columns " + columnNames.length);
+      }
+      if (!ROW_KEY_NAME.equalsIgnoreCase(columnNames[rowKeyIndex])) {
+        throw new IllegalArgumentException("Column at " + rowKeyIndex + " must be "
+            + ROW_KEY_NAME + " and is " + columnNames[rowKeyIndex]);
+      }
+    }
+  }
+
+  @Override
+  public void configure(ComponentConfiguration conf) {
+  }
+
+  @Override
+  public void initialize(Event event, byte[] columnFamily) {
+    this.headers = event.getHeaders();
+    this.payload = event.getBody();
+    this.cf = columnFamily;
+  }
+
+  /**
+   * Returns a row-key with the following format:
+   * [time in millis]-[random key]-[nonce]
+   */
+  protected byte[] getRowKey(Calendar cal) {
+    /* NOTE: This key generation strategy has the following properties:
+     * 
+     * 1) Within a single JVM, the same row key will never be duplicated.
+     * 2) Amongst any two JVM's operating at different time periods (according
+     *    to their respective clocks), the same row key will never be 
+     *    duplicated.
+     * 3) Amongst any two JVM's operating concurrently (according to their
+     *    respective clocks), the odds of duplicating a row-key are non-zero
+     *    but infinitesimal. This would require simultaneous collision in (a) 
+     *    the timestamp (b) the respective nonce and (c) the random string.
+     *    The string is necessary since (a) and (b) could collide if a fleet
+     *    of Flume agents are restarted in tandem.
+     *    
+     *  Row-key uniqueness is important because conflicting row-keys will cause
+     *  data loss. */
+    String rowKey = String.format("%s-%s-%s", cal.getTimeInMillis(),
+        randomKey, nonce.getAndIncrement());
+    return rowKey.getBytes(charset);
+  }
+
+  protected byte[] getRowKey() {
+    return getRowKey(Calendar.getInstance());
+  }
+
+  @Override
+  public List<Row> getActions() throws FlumeException {
+    List<Row> actions = Lists.newArrayList();
+    byte[] rowKey;
+    Matcher m = inputPattern.matcher(new String(payload, charset));
+    if (!m.matches()) {
+      return Lists.newArrayList();
+    }
+
+    if (m.groupCount() != colNames.size()) {
+      return Lists.newArrayList();
+    }
+
+    try {
+      if (rowKeyIndex < 0) {
+        rowKey = getRowKey();
+      } else {
+        rowKey = m.group(rowKeyIndex + 1).getBytes(Charsets.UTF_8);
+      }
+      Put put = new Put(rowKey);
+
+      for (int i = 0; i < colNames.size(); i++) {
+        if (i != rowKeyIndex) {
+          put.addColumn(cf, colNames.get(i), m.group(i + 1).getBytes(Charsets.UTF_8));
+        }
+      }
+      if (depositHeaders) {
+        for (Map.Entry<String, String> entry : headers.entrySet()) {
+          put.addColumn(cf, entry.getKey().getBytes(charset), entry.getValue().getBytes(charset));
+        }
+      }
+      actions.add(put);
+    } catch (Exception e) {
+      throw new FlumeException("Could not get row key!", e);
+    }
+    return actions;
+  }
+
+  @Override
+  public List<Increment> getIncrements() {
+    return Lists.newArrayList();
+  }
+
+  @Override
+  public void close() {
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/719afe90/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/SimpleHBase2EventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/SimpleHBase2EventSerializer.java b/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/SimpleHBase2EventSerializer.java
new file mode 100644
index 0000000..8236543
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/SimpleHBase2EventSerializer.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.sink.hbase2;
+
+import com.google.common.base.Charsets;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.FlumeException;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Row;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * A simple serializer that returns puts from an event, by writing the event
+ * body into it. The headers are discarded. It also updates a row in HBase
+ * which acts as an event counter.
+ * <p>Takes optional parameters:<p>
+ * <tt>rowPrefix:</tt> The prefix to be used. Default: <i>default</i><p>
+ * <tt>incrementRow</tt> The row to increment. Default: <i>incRow</i><p>
+ * <tt>suffix:</tt> <i>uuid/random/timestamp.</i>Default: <i>uuid</i><p>
+ * <p>Mandatory parameters: <p>
+ * <tt>cf:</tt>Column family.<p>
+ * Components that have no defaults and will not be used if null:
+ * <tt>payloadColumn:</tt> Which column to put payload in. If it is null,
+ * event data will not be written.<p>
+ * <tt>incColumn:</tt> Which column to increment. Null means no column is
+ * incremented.
+ */
+public class SimpleHBase2EventSerializer implements HBase2EventSerializer {
+  private String rowPrefix;
+  private byte[] incrementRow;
+  private byte[] cf;
+  private byte[] plCol;
+  private byte[] incCol;
+  private KeyType keyType;
+  private byte[] payload;
+
+  public SimpleHBase2EventSerializer() {
+  }
+
+  @Override
+  public void configure(Context context) {
+    rowPrefix = context.getString("rowPrefix", "default");
+    incrementRow =
+        context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);
+    String suffix = context.getString("suffix", "uuid");
+
+    String payloadColumn = context.getString("payloadColumn", "pCol");
+    String incColumn = context.getString("incrementColumn", "iCol");
+    if (payloadColumn != null && !payloadColumn.isEmpty()) {
+      switch (suffix) {
+        case "timestamp":
+          keyType = KeyType.TS;
+          break;
+        case "random":
+          keyType = KeyType.RANDOM;
+          break;
+        case "nano":
+          keyType = KeyType.TSNANO;
+          break;
+        default:
+          keyType = KeyType.UUID;
+          break;
+      }
+      plCol = payloadColumn.getBytes(Charsets.UTF_8);
+    }
+    if (incColumn != null && !incColumn.isEmpty()) {
+      incCol = incColumn.getBytes(Charsets.UTF_8);
+    }
+  }
+
+  @Override
+  public void configure(ComponentConfiguration conf) {
+  }
+
+  @Override
+  public void initialize(Event event, byte[] cf) {
+    this.payload = event.getBody();
+    this.cf = cf;
+  }
+
+  @Override
+  public List<Row> getActions() throws FlumeException {
+    List<Row> actions = new LinkedList<>();
+    if (plCol != null) {
+      byte[] rowKey;
+      try {
+        if (keyType == KeyType.TS) {
+          rowKey = SimpleRowKeyGenerator.getTimestampKey(rowPrefix);
+        } else if (keyType == KeyType.RANDOM) {
+          rowKey = SimpleRowKeyGenerator.getRandomKey(rowPrefix);
+        } else if (keyType == KeyType.TSNANO) {
+          rowKey = SimpleRowKeyGenerator.getNanoTimestampKey(rowPrefix);
+        } else {
+          rowKey = SimpleRowKeyGenerator.getUUIDKey(rowPrefix);
+        }
+        Put put = new Put(rowKey);
+        put.addColumn(cf, plCol, payload);
+        actions.add(put);
+      } catch (Exception e) {
+        throw new FlumeException("Could not get row key!", e);
+      }
+
+    }
+    return actions;
+  }
+
+  @Override
+  public List<Increment> getIncrements() {
+    List<Increment> incs = new LinkedList<>();
+    if (incCol != null) {
+      Increment inc = new Increment(incrementRow);
+      inc.addColumn(cf, incCol, 1);
+      incs.add(inc);
+    }
+    return incs;
+  }
+
+  @Override
+  public void close() {
+  }
+
+  public enum KeyType {
+    UUID,
+    RANDOM,
+    TS,
+    TSNANO
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/719afe90/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/SimpleRowKeyGenerator.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/SimpleRowKeyGenerator.java b/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/SimpleRowKeyGenerator.java
new file mode 100644
index 0000000..5386107
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-hbase2-sink/src/main/java/org/apache/flume/sink/hbase2/SimpleRowKeyGenerator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.hbase2;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Random;
+import java.util.UUID;
+
+/**
+ * Utility class for users to generate their own keys. Any key can be used,
+ * this is just a utility that provides a set of simple keys.
+ */
+public class SimpleRowKeyGenerator {
+
+  public static byte[] getUUIDKey(String prefix) throws UnsupportedEncodingException {
+    return (prefix + UUID.randomUUID().toString()).getBytes("UTF8");
+  }
+
+  public static byte[] getRandomKey(String prefix) throws UnsupportedEncodingException {
+    return (prefix + String.valueOf(new Random().nextLong())).getBytes("UTF8");
+  }
+
+  public static byte[] getTimestampKey(String prefix) throws UnsupportedEncodingException {
+    return (prefix + String.valueOf(System.currentTimeMillis())).getBytes("UTF8");
+  }
+
+  public static byte[] getNanoTimestampKey(String prefix) throws UnsupportedEncodingException {
+    return (prefix + String.valueOf(System.nanoTime())).getBytes("UTF8");
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/719afe90/flume-ng-sinks/flume-ng-hbase2-sink/src/test/java/org/apache/flume/sink/hbase2/IncrementHBase2Serializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase2-sink/src/test/java/org/apache/flume/sink/hbase2/IncrementHBase2Serializer.java b/flume-ng-sinks/flume-ng-hbase2-sink/src/test/java/org/apache/flume/sink/hbase2/IncrementHBase2Serializer.java
new file mode 100644
index 0000000..2b32db4
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-hbase2-sink/src/test/java/org/apache/flume/sink/hbase2/IncrementHBase2Serializer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.sink.hbase2;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+import java.util.Collections;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.conf.ComponentConfiguration;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Row;
+
+import java.util.List;
+
+/**
+ * For Increment-related unit tests.
+ */
+class IncrementHBase2Serializer implements HBase2EventSerializer, BatchAware {
+  private Event event;
+  private byte[] family;
+  private int numBatchesStarted = 0;
+
+  @Override public void configure(Context context) { }
+  @Override public void configure(ComponentConfiguration conf) { }
+  @Override public void close() { }
+
+  @Override
+  public void initialize(Event event, byte[] columnFamily) {
+    this.event = event;
+    this.family = columnFamily;
+  }
+
+  // This class only creates Increments.
+  @Override
+  public List<Row> getActions() {
+    return Collections.emptyList();
+  }
+
+  // Treat each Event as a String, i,e, "row:qualifier".
+  @Override
+  public List<Increment> getIncrements() {
+    List<Increment> increments = Lists.newArrayList();
+    String body = new String(event.getBody(), Charsets.UTF_8);
+    String[] pieces = body.split(":");
+    String row = pieces[0];
+    String qualifier = pieces[1];
+    Increment inc = new Increment(row.getBytes(Charsets.UTF_8));
+    inc.addColumn(family, qualifier.getBytes(Charsets.UTF_8), 1L);
+    increments.add(inc);
+    return increments;
+  }
+
+  @Override
+  public void onBatchStart() {
+    numBatchesStarted++;
+  }
+
+  @VisibleForTesting
+  public int getNumBatchesStarted() {
+    return numBatchesStarted;
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/719afe90/flume-ng-sinks/flume-ng-hbase2-sink/src/test/java/org/apache/flume/sink/hbase2/MockSimpleHBase2EventSerializer.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase2-sink/src/test/java/org/apache/flume/sink/hbase2/MockSimpleHBase2EventSerializer.java b/flume-ng-sinks/flume-ng-hbase2-sink/src/test/java/org/apache/flume/sink/hbase2/MockSimpleHBase2EventSerializer.java
new file mode 100644
index 0000000..6c6da71
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-hbase2-sink/src/test/java/org/apache/flume/sink/hbase2/MockSimpleHBase2EventSerializer.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.sink.hbase2;
+
+import java.util.List;
+
+import org.apache.flume.FlumeException;
+import org.apache.hadoop.hbase.client.Row;
+
+class MockSimpleHBase2EventSerializer extends SimpleHBase2EventSerializer {
+
+  public static boolean throwException = false;
+
+  @Override
+  public List<Row> getActions() throws FlumeException {
+    if (throwException) {
+      throw new FlumeException("Exception for testing");
+    }
+    return super.getActions();
+  }
+}
\ No newline at end of file