You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2018/08/02 23:01:19 UTC

[hbase-connectors] branch master updated: HBASE-20934 Create an hbase-connectors repository; commit new kafka connect here

This is an automated email from the ASF dual-hosted git repository.

stack pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase-connectors.git


The following commit(s) were added to refs/heads/master by this push:
     new 552167c  HBASE-20934 Create an hbase-connectors repository; commit new kafka connect here
552167c is described below

commit 552167cedbcd621ed2dda213059b673487412aed
Author: Michael Stack <st...@apache.org>
AuthorDate: Thu Aug 2 15:56:37 2018 -0700

    HBASE-20934 Create an hbase-connectors repository; commit new kafka connect here
    
    First cut. No bin to startup the proxy server. TODO.
---
 conf/kafka-route-rules.xml                         |  45 +++
 hbase-kafka-model/pom.xml                          | 220 +++++++++++
 .../src/main/avro/HbaseKafkaEvent.avro             |  30 ++
 hbase-kafka-proxy/.pom.xml.swp                     | Bin 0 -> 24576 bytes
 hbase-kafka-proxy/pom.xml                          | 258 +++++++++++++
 .../org/apache/hadoop/hbase/kafka/DropRule.java    |  29 ++
 .../hadoop/hbase/kafka/DumpToStringListener.java   | 112 ++++++
 .../hadoop/hbase/kafka/KafkaBridgeConnection.java  | 216 +++++++++++
 .../org/apache/hadoop/hbase/kafka/KafkaProxy.java  | 341 +++++++++++++++++
 .../hadoop/hbase/kafka/KafkaTableForBridge.java    | 199 ++++++++++
 .../java/org/apache/hadoop/hbase/kafka/Rule.java   | 228 ++++++++++++
 .../hadoop/hbase/kafka/TopicRoutingRules.java      | 237 ++++++++++++
 .../org/apache/hadoop/hbase/kafka/TopicRule.java   |  41 +++
 .../hadoop/hbase/kafka/ProducerForTesting.java     | 142 ++++++++
 .../apache/hadoop/hbase/kafka/TestDropRule.java    | 210 +++++++++++
 .../hadoop/hbase/kafka/TestProcessMutations.java   | 114 ++++++
 .../hadoop/hbase/kafka/TestQualifierMatching.java  |  73 ++++
 .../apache/hadoop/hbase/kafka/TestRouteRules.java  | 218 +++++++++++
 pom.xml                                            | 404 +++++++++++++++++++++
 19 files changed, 3117 insertions(+)

diff --git a/conf/kafka-route-rules.xml b/conf/kafka-route-rules.xml
new file mode 100644
index 0000000..4d31ee2
--- /dev/null
+++ b/conf/kafka-route-rules.xml
@@ -0,0 +1,45 @@
+<!-- rules used by hbase kafka proxy to route mutations to kafka topics-->
+<rules>
+<!--
+this rule would route all mutations from table default:mytable mycf to mykafkatopic
+
+<rule action="route" table="default:mytable" topic="mykafkatopic"/>
+
+this rule would route all mutations from column family mycf of table default:mytable to
+mykafkatopic
+
+<rule action="route" table="default:mytable" columnFamily="mycf" topic="mykafkatopic"/>
+
+this rule would route all mutations from qualifier myqualifier in column family mycf of table
+default:mytable to mykafkatopic
+
+<rule action="route" table="default:mytable" columnFamily="mycf" qualifier="myqualifier"
+topic="mykafkatopic"/>
+
+this rule would route all mutations from all qualifiers in column family mycf of table
+default:mytable to mykafkatopic that start with myqualifier
+
+<rule action="route" table="default:mytable" columnFamily="mycf" qualifier="myqualifier*"
+topic="mykafkatopic"/>
+
+this rule would route all mutations from all qualifiers in column family mycf of table
+default:mytable to mykafkatopic that contain myqualifier
+
+<rule action="route" table="default:mytable" columnFamily="mycf" qualifier="*myqualifier*"
+topic="mykafkatopic"/>
+
+when used together, these rules would route all mutations from column family mycf of table
+default:mytable to mykafkatopic except the ones from myqualifier
+
+<rule action="drop" table="default:mytable" columnFamily="mycf" qualifier="myqualifier"
+topic="mykafkatopic"/>
+<rule action="route" table="default:mytable" columnFamily="mycf" topic="mykafkatopic"/>
+
+when used together, these rules would route all mutations from column family mycf of table
+default:mytable to mykafkatopic except the ones that contain secretb in the qualifier name
+
+<rule action="drop" table="default:mytable" columnFamily="mycf" qualifier="*secret*"
+topic="mykafkatopic"/>
+<rule action="route" table="default:mytable" columnFamily="mycf" topic="mykafkatopic"/>
+-->
+</rules>
diff --git a/hbase-kafka-model/pom.xml b/hbase-kafka-model/pom.xml
new file mode 100644
index 0000000..8c497b1
--- /dev/null
+++ b/hbase-kafka-model/pom.xml
@@ -0,0 +1,220 @@
+<?xml version="1.0"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <!--
+  /**
+   * Licensed to the Apache Software Foundation (ASF) under one
+   * or more contributor license agreements.  See the NOTICE file
+   * distributed with this work for additional information
+   * regarding copyright ownership.  The ASF licenses this file
+   * to you under the Apache License, Version 2.0 (the
+   * "License"); you may not use this file except in compliance
+   * with the License.  You may obtain a copy of the License at
+   *
+   *     http://www.apache.org/licenses/LICENSE-2.0
+   *
+   * Unless required by applicable law or agreed to in writing, software
+   * distributed under the License is distributed on an "AS IS" BASIS,
+   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   * See the License for the specific language governing permissions and
+   * limitations under the License.
+   */
+  -->
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.hbase</groupId>
+    <artifactId>hbase-connectors</artifactId>
+    <version>1.0.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>hbase-kafka-model</artifactId>
+  <name>Apache HBase - Model Objects for Kafka Proxy</name>
+  <description>Model objects that represent HBase mutations</description>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <sourceDirectory>${project.basedir}/target/java</sourceDirectory>
+    <resources>
+      <resource>
+        <directory>src/main/resources/</directory>
+        <includes>
+          <include>hbase-default.xml</include>
+        </includes>
+      </resource>
+    </resources>
+    <testResources>
+      <testResource>
+        <directory>src/test/resources/META-INF/</directory>
+        <targetPath>META-INF/</targetPath>
+        <includes>
+          <include>NOTICE</include>
+        </includes>
+        <filtering>true</filtering>
+      </testResource>
+    </testResources>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.avro</groupId>
+        <artifactId>avro-maven-plugin</artifactId>
+        <version>${avro.version}</version>
+        <executions>
+          <execution>
+            <phase>generate-sources</phase>
+            <goals>
+              <goal>schema</goal>
+            </goals>
+            <configuration>
+              <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
+              <outputDirectory>${project.basedir}/target/java/</outputDirectory>
+              <includes>
+                <include>**/*.avro</include>
+              </includes>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-remote-resources-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-site-plugin</artifactId>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
+      </plugin>
+      <plugin>
+        <!--Make it so assembly:single does nothing in here-->
+        <artifactId>maven-assembly-plugin</artifactId>
+        <configuration>
+          <skipAssembly>true</skipAssembly>
+        </configuration>
+      </plugin>
+      <plugin>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <properties>
+            <property>
+              <name>listener</name>
+              <value>org.apache.hadoop.hbase.ResourceCheckerJUnitListener</value>
+            </property>
+          </properties>
+        </configuration>
+      </plugin>
+      <!-- Make a jar and put the sources in the jar -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-source-plugin</artifactId>
+        <configuration>
+          <excludes>
+            <exclude>hbase-default.xml</exclude>
+          </excludes>
+        </configuration>
+      </plugin>
+    </plugins>
+    <pluginManagement>
+      <plugins>
+        <!--This plugin's configuration is used to store Eclipse m2e settings
+             only. It has no influence on the Maven build itself. -->
+        <plugin>
+          <groupId>org.eclipse.m2e</groupId>
+          <artifactId>lifecycle-mapping</artifactId>
+          <version>1.0.0</version>
+          <configuration>
+            <lifecycleMappingMetadata>
+              <pluginExecutions>
+                <pluginExecution>
+                  <pluginExecutionFilter>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-antrun-plugin</artifactId>
+                    <versionRange>[${maven.antrun.version}]</versionRange>
+                    <goals>
+                      <goal>run</goal>
+                    </goals>
+                  </pluginExecutionFilter>
+                  <action>
+                    <execute/>
+                  </action>
+                </pluginExecution>
+                <pluginExecution>
+                  <pluginExecutionFilter>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-dependency-plugin</artifactId>
+                    <versionRange>[2.8,)</versionRange>
+                    <goals>
+                      <goal>build-classpath</goal>
+                    </goals>
+                  </pluginExecutionFilter>
+                  <action>
+                    <ignore></ignore>
+                  </action>
+                </pluginExecution>
+                <pluginExecution>
+                  <pluginExecutionFilter>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-compiler-plugin</artifactId>
+                    <versionRange>[3.2,)</versionRange>
+                    <goals>
+                      <goal>compile</goal>
+                    </goals>
+                  </pluginExecutionFilter>
+                  <action>
+                    <ignore></ignore>
+                  </action>
+                </pluginExecution>
+              </pluginExecutions>
+            </lifecycleMappingMetadata>
+          </configuration>
+        </plugin>
+      </plugins>
+    </pluginManagement>
+  </build>
+
+
+  <profiles>
+    <!-- Needs to make the profile in apache parent pom -->
+    <profile>
+      <id>apache-release</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-resources-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>license-javadocs</id>
+                <phase>prepare-package</phase>
+                <goals>
+                  <goal>copy-resources</goal>
+                </goals>
+                <configuration>
+                  <outputDirectory>${project.build.directory}/apidocs</outputDirectory>
+                  <resources>
+                    <resource>
+                      <directory>src/main/javadoc/META-INF/</directory>
+                      <targetPath>META-INF/</targetPath>
+                      <includes>
+                        <include>NOTICE</include>
+                      </includes>
+                      <filtering>true</filtering>
+                    </resource>
+                  </resources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+</project>
diff --git a/hbase-kafka-model/src/main/avro/HbaseKafkaEvent.avro b/hbase-kafka-model/src/main/avro/HbaseKafkaEvent.avro
new file mode 100644
index 0000000..ec88627
--- /dev/null
+++ b/hbase-kafka-model/src/main/avro/HbaseKafkaEvent.avro
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+{"namespace": "org.apache.hadoop.hbase.kafka",
+ "type": "record",
+ "name": "HBaseKafkaEvent",
+ "fields": [
+    {"name": "key", "type": "bytes"},
+    {"name": "timestamp",  "type": "long" },
+    {"name": "delete",  "type": "boolean" },
+    {"name": "value", "type": "bytes"},
+    {"name": "qualifier", "type": "bytes"},
+    {"name": "family", "type": "bytes"},
+    {"name": "table", "type": "bytes"}
+ ]
+}
diff --git a/hbase-kafka-proxy/.pom.xml.swp b/hbase-kafka-proxy/.pom.xml.swp
new file mode 100644
index 0000000..0507dd6
Binary files /dev/null and b/hbase-kafka-proxy/.pom.xml.swp differ
diff --git a/hbase-kafka-proxy/pom.xml b/hbase-kafka-proxy/pom.xml
new file mode 100644
index 0000000..7ece36f
--- /dev/null
+++ b/hbase-kafka-proxy/pom.xml
@@ -0,0 +1,258 @@
+<?xml version="1.0"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <!--
+  /**
+   * Licensed to the Apache Software Foundation (ASF) under one
+   * or more contributor license agreements.  See the NOTICE file
+   * distributed with this work for additional information
+   * regarding copyright ownership.  The ASF licenses this file
+   * to you under the Apache License, Version 2.0 (the
+   * "License"); you may not use this file except in compliance
+   * with the License.  You may obtain a copy of the License at
+   *
+   *     http://www.apache.org/licenses/LICENSE-2.0
+   *
+   * Unless required by applicable law or agreed to in writing, software
+   * distributed under the License is distributed on an "AS IS" BASIS,
+   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   * See the License for the specific language governing permissions and
+   * limitations under the License.
+   */
+  -->
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.hbase</groupId>
+    <artifactId>hbase-connectors</artifactId>
+    <version>1.0.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>hbase-kafka-proxy</artifactId>
+  <name>Apache HBase - Kafka Proxy</name>
+  <description>Proxy that forwards HBase replication events to a Kakfa broker</description>
+  <properties>
+    <collections.version>4.1</collections.version>
+    <commons-lang3.version>3.6</commons-lang3.version>
+    <commons-io.version>2.5</commons-io.version>
+    <kafka-clients.version>2.0.0</kafka-clients.version>
+    <commons-io.version>2.5</commons-io.version>
+  </properties>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-remote-resources-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-site-plugin</artifactId>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
+      </plugin>
+      <plugin>
+        <!--Make it so assembly:single does nothing in here-->
+        <artifactId>maven-assembly-plugin</artifactId>
+        <configuration>
+          <skipAssembly>true</skipAssembly>
+        </configuration>
+      </plugin>
+      <plugin>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <properties>
+            <property>
+              <name>listener</name>
+              <value>org.apache.hadoop.hbase.ResourceCheckerJUnitListener</value>
+            </property>
+          </properties>
+        </configuration>
+      </plugin>
+    </plugins>
+    <pluginManagement>
+      <plugins>
+        <!--This plugin's configuration is used to store Eclipse m2e settings
+             only. It has no influence on the Maven build itself. -->
+        <plugin>
+          <groupId>org.eclipse.m2e</groupId>
+          <artifactId>lifecycle-mapping</artifactId>
+          <version>1.0.0</version>
+          <configuration>
+            <lifecycleMappingMetadata>
+              <pluginExecutions>
+                <pluginExecution>
+                  <pluginExecutionFilter>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-antrun-plugin</artifactId>
+                    <versionRange>[${maven.antrun.version}]</versionRange>
+                    <goals>
+                      <goal>run</goal>
+                    </goals>
+                  </pluginExecutionFilter>
+                  <action>
+                    <execute/>
+                  </action>
+                </pluginExecution>
+                <pluginExecution>
+                  <pluginExecutionFilter>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-dependency-plugin</artifactId>
+                    <versionRange>[2.8,)</versionRange>
+                    <goals>
+                      <goal>build-classpath</goal>
+                    </goals>
+                  </pluginExecutionFilter>
+                  <action>
+                    <ignore></ignore>
+                  </action>
+                </pluginExecution>
+                <pluginExecution>
+                  <pluginExecutionFilter>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-compiler-plugin</artifactId>
+                    <versionRange>[3.2,)</versionRange>
+                    <configuration>
+                      <source>${compileSource}</source>
+                      <target>${compileSource}</target>
+                      <showWarnings>true</showWarnings>
+                      <showDeprecation>false</showDeprecation>
+                      <useIncrementalCompilation>false</useIncrementalCompilation>
+                      <compilerArgument>-Xlint:-options</compilerArgument>
+                    </configuration>
+                    <goals>
+                      <goal>compile</goal>
+                    </goals>
+                  </pluginExecutionFilter>
+                  <action>
+                    <ignore></ignore>
+                  </action>
+                </pluginExecution>
+              </pluginExecutions>
+            </lifecycleMappingMetadata>
+          </configuration>
+        </plugin>
+      </plugins>
+    </pluginManagement>
+  </build>
+  <dependencies>
+   <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase.connectors</groupId>
+      <artifactId>hbase-kafka-model</artifactId>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-common</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-common</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-client</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-zookeeper</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-server</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-annotations</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-annotations</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>${kafka-clients.version}</version>
+    </dependency>
+    <!-- General dependencies -->
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <version>${commons-lang3.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-collections4</artifactId>
+      <version>${collections.version}</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <version>${commons-io.version}</version>
+      <scope>compile</scope>
+    </dependency>
+  </dependencies>
+
+  <profiles>
+    <!-- Needs to make the profile in apache parent pom -->
+    <profile>
+      <id>apache-release</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-resources-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>license-javadocs</id>
+                <phase>prepare-package</phase>
+                <goals>
+                  <goal>copy-resources</goal>
+                </goals>
+                <configuration>
+                  <outputDirectory>${project.build.directory}/apidocs</outputDirectory>
+                  <resources>
+                    <resource>
+                      <directory>src/main/javadoc/META-INF/</directory>
+                      <targetPath>META-INF/</targetPath>
+                      <includes>
+                        <include>NOTICE</include>
+                      </includes>
+                      <filtering>true</filtering>
+                    </resource>
+                  </resources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <!-- Skip the tests in this module -->
+    <profile>
+      <id>skipKafkaProxyTests</id>
+      <activation>
+        <property>
+          <name>skipKafkaProxyTests</name>
+        </property>
+      </activation>
+      <properties>
+        <surefire.skipFirstPart>true</surefire.skipFirstPart>
+        <surefire.skipSecondPart>true</surefire.skipSecondPart>
+      </properties>
+    </profile>
+
+
+
+  </profiles>
+</project>
diff --git a/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DropRule.java b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DropRule.java
new file mode 100644
index 0000000..8bc1eff
--- /dev/null
+++ b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DropRule.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.kafka;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Rule that indicates the Cell should not be replicated
+ */
+@InterfaceAudience.Private
+public class DropRule extends Rule {
+  public DropRule() {
+  }
+}
diff --git a/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DumpToStringListener.java b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DumpToStringListener.java
new file mode 100644
index 0000000..5874f35
--- /dev/null
+++ b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/DumpToStringListener.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.kafka;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.VersionInfo;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.org.apache.commons.cli.BasicParser;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
+
+
+/**
+ * connects to kafka and reads from the passed in topics.  Parses each message into an avro object
+ * and dumps it to the console.
+ */
+@InterfaceAudience.Private
+public final class DumpToStringListener {
+  private static final Logger LOG = LoggerFactory.getLogger(DumpToStringListener.class);
+
+  private DumpToStringListener(){
+
+  }
+
+  public static void main(String[] args) {
+    LOG.info("***** STARTING service '" + DumpToStringListener.class.getSimpleName() + "' *****");
+    VersionInfo.logVersion();
+
+    Options options = new Options();
+    options.addOption("k", "kafkabrokers", true, "Kafka Brokers " +
+            "(comma delimited)");
+    options.addOption("t", "kafkatopics", true,"Kafka Topics "
+        + "to subscribe to (comma delimited)");
+    CommandLine commandLine = null;
+    try {
+      commandLine = new BasicParser().parse(options, args);
+    } catch (ParseException e) {
+      LOG.error("Could not parse: ", e);
+      printUsageAndExit(options, -1);
+    }
+    SpecificDatumReader<HBaseKafkaEvent> dreader =
+            new SpecificDatumReader<>(HBaseKafkaEvent.SCHEMA$);
+
+    String topic = commandLine.getOptionValue('t');
+    Properties props = new Properties();
+    props.put("bootstrap.servers", commandLine.getOptionValue('k'));
+    props.put("group.id", "hbase kafka test tool");
+    props.put("key.deserializer", ByteArrayDeserializer.class.getName());
+    props.put("value.deserializer", ByteArrayDeserializer.class.getName());
+
+    try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(props);){
+      consumer.subscribe(Arrays.stream(topic.split(",")).collect(Collectors.toList()));
+
+      while (true) {
+        ConsumerRecords<byte[], byte[]> records = consumer.poll(10000);
+        Iterator<ConsumerRecord<byte[], byte[]>> it = records.iterator();
+        while (it.hasNext()) {
+          ConsumerRecord<byte[], byte[]> record = it.next();
+          BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(record.value(), null);
+          try {
+            HBaseKafkaEvent event = dreader.read(null, decoder);
+            LOG.debug("key :" + Bytes.toString(record.key()) + " value " + event);
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        }
+      }
+    }
+  }
+
+  private static void printUsageAndExit(Options options, int exitCode) {
+    HelpFormatter formatter = new HelpFormatter();
+    formatter.printHelp("hbase " + DumpToStringListener.class.getName(), "", options,
+            "\n[--kafkabrokers <kafka brokers (comma delmited)>] " +
+                    "[-k <kafka brokers (comma delmited)>] \n", true);
+    System.exit(exitCode);
+  }
+
+}
diff --git a/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaBridgeConnection.java b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaBridgeConnection.java
new file mode 100644
index 0000000..55ded5c
--- /dev/null
+++ b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaBridgeConnection.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.kafka;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.BufferedMutatorParams;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableBuilder;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+
+/**
+ * a alternative implementation of a connection object that forwards the mutations to a kafka queue
+ * depending on the routing rules (see kafka-route-rules.xml).
+ * */
+@InterfaceAudience.Private
+public class KafkaBridgeConnection implements Connection {
+  private static final Logger LOG = LoggerFactory.getLogger(KafkaBridgeConnection.class);
+
+  private final Configuration conf;
+  private final User user;
+  private final ExecutorService pool;
+  private volatile boolean closed = false;
+  private TopicRoutingRules routingRules;
+  private Producer<byte[],byte[]> producer;
+  private DatumWriter<HBaseKafkaEvent> avroWriter =
+      new SpecificDatumWriter<>(HBaseKafkaEvent.getClassSchema());
+
+
+    /**
+     * Public constructor
+     * @param conf hbase configuration
+     * @param pool executor pool
+     * @param user user who requested connection
+     * @throws IOException on error
+     */
+  public KafkaBridgeConnection(Configuration conf,
+                               ExecutorService pool,
+                               User user) throws IOException {
+    this.conf = conf;
+    this.user = user;
+    this.pool = pool;
+    setupRules();
+    startKafkaConnection();
+  }
+
+  /**
+   * for testing.
+   * @param conf  hbase configuration
+   * @param pool  executor service
+   * @param user  user with connection
+   * @param routingRules a set of routing rules
+   * @param producer a kafka producer
+   * @throws IOException on error
+   */
+  public KafkaBridgeConnection(Configuration conf,
+                                ExecutorService pool,
+                                User user,
+                                TopicRoutingRules routingRules,
+                               Producer<byte[],byte[]> producer)
+          throws IOException {
+    this.conf = conf;
+    this.user = user;
+    this.pool = pool;
+    this.producer=producer;
+    this.routingRules=routingRules;
+  }
+
+  private void setupRules() throws IOException {
+    String file = this.conf.get(KafkaProxy.KAFKA_PROXY_RULES_FILE);
+    routingRules = new TopicRoutingRules();
+    try (FileInputStream fin = new FileInputStream(file);){
+      routingRules.parseRules(fin);
+    }
+  }
+
+  private void startKafkaConnection() throws IOException {
+    Properties configProperties = new Properties();
+
+    String kafkaPropsFile = conf.get(KafkaProxy.KAFKA_PROXY_KAFKA_PROPERTIES,"");
+    if (!StringUtils.isEmpty(kafkaPropsFile)){
+      try (FileInputStream fs = new java.io.FileInputStream(
+          new File(kafkaPropsFile))){
+        configProperties.load(fs);
+      }
+    } else {
+      String kafkaServers =conf.get(KafkaProxy.KAFKA_PROXY_KAFKA_BROKERS);
+      configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
+    }
+
+    configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+        "org.apache.kafka.common.serialization.ByteArraySerializer");
+    configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+        "org.apache.kafka.common.serialization.ByteArraySerializer");
+    this.producer = new KafkaProducer<byte[], byte[]>(configProperties);
+  }
+
+
+
+  @Override
+  public void abort(String why, Throwable e) {}
+
+  @Override
+  public boolean isAborted() {
+    return false;
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return this.conf;
+  }
+
+  @Override
+  public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
+    return null;
+  }
+
+  @Override
+  public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
+    return null;
+  }
+
+  @Override
+  public RegionLocator getRegionLocator(TableName tableName) throws IOException {
+    return null;
+  }
+
+  @Override
+  public Admin getAdmin() throws IOException {
+    return null;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (!this.closed) {
+      this.closed = true;
+      this.producer.close();
+    }
+  }
+
+  @Override
+  public boolean isClosed() {
+    return this.closed;
+  }
+
+  @Override
+  public TableBuilder getTableBuilder(final TableName tn, ExecutorService pool) {
+    if (isClosed()) {
+      throw new RuntimeException("KafkaBridgeConnection is closed.");
+    }
+    final Configuration passedInConfiguration = getConfiguration();
+    return new TableBuilder() {
+      @Override
+      public TableBuilder setOperationTimeout(int timeout) {
+        return null;
+      }
+
+      @Override
+      public TableBuilder setRpcTimeout(int timeout) {
+        return null;
+      }
+
+      @Override
+      public TableBuilder setReadRpcTimeout(int timeout) {
+        return null;
+      }
+
+      @Override
+      public TableBuilder setWriteRpcTimeout(int timeout) {
+        return null;
+      }
+
+      @Override
+      public Table build() {
+        return new KafkaTableForBridge(tn,passedInConfiguration,routingRules,producer,avroWriter) ;
+      }
+    };
+  }
+
+}
diff --git a/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaProxy.java b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaProxy.java
new file mode 100644
index 0000000..14c9179
--- /dev/null
+++ b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaProxy.java
@@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.kafka;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.HRegionServerCommandLine;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.org.apache.commons.cli.BasicParser;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.ParseException;
+
+
+
+/**
+ * hbase to kafka bridge.
+ *
+ * Starts up a region server and receives replication events, just like a peer
+ * cluster member.  It takes the events and cell by cell determines how to
+ * route them (see kafka-route-rules.xml)
+ */
+@InterfaceAudience.Private
+public final class KafkaProxy {
+  private static final Logger LOG = LoggerFactory.getLogger(KafkaProxy.class);
+
+  public static final String KAFKA_PROXY_RULES_FILE = "kafkaproxy.rule.file";
+  public static final String KAFKA_PROXY_KAFKA_PROPERTIES = "kafkaproxy.kafka.properties";
+  public static final String KAFKA_PROXY_KAFKA_BROKERS = "kafkaproxy.kafka.brokers";
+
+  private static Map<String,String> DEFAULT_PROPERTIES = new HashMap<>();
+
+  static {
+    DEFAULT_PROPERTIES.put("hbase.cluster.distributed","true");
+    DEFAULT_PROPERTIES.put("zookeeper.znode.parent","/kafkaproxy");
+    DEFAULT_PROPERTIES.put("hbase.regionserver.port","17020");
+    DEFAULT_PROPERTIES.put("hbase.regionserver.info.port","17010");
+    DEFAULT_PROPERTIES.put("hbase.client.connection.impl",
+            "org.apache.hadoop.hbase.kafka.KafkaBridgeConnection");
+    DEFAULT_PROPERTIES.put("hbase.regionserver.admin.service","false");
+    DEFAULT_PROPERTIES.put("hbase.regionserver.client.service","false");
+    DEFAULT_PROPERTIES.put("hbase.wal.provider",
+            "org.apache.hadoop.hbase.wal.DisabledWALProvider");
+    DEFAULT_PROPERTIES.put("hbase.regionserver.workers","false");
+    DEFAULT_PROPERTIES.put("hfile.block.cache.size","0.0001");
+    DEFAULT_PROPERTIES.put("hbase.mob.file.cache.size","0");
+    DEFAULT_PROPERTIES.put("hbase.masterless","true");
+    DEFAULT_PROPERTIES.put("hbase.regionserver.metahandler.count","1");
+    DEFAULT_PROPERTIES.put("hbase.regionserver.replication.handler.count","1");
+    DEFAULT_PROPERTIES.put("hbase.regionserver.handler.count","1");
+    DEFAULT_PROPERTIES.put("hbase.ipc.server.read.threadpool.size","3");
+  }
+
+  private static void printUsageAndExit(Options options, int exitCode) {
+    HelpFormatter formatter = new HelpFormatter();
+    formatter.printHelp("hbase kafkaproxy start", "", options,
+      "\nTo run the kafka proxy as a daemon, execute " +
+        "hbase-daemon.sh start|stop kafkaproxy " +
+        "[--kafkabrokers <kafka brokers (comma delmited)>] " +
+        "[-b <kafka brokers (comma delmited)>] " +
+        "[--routerulesfile <file with rules to route to kafka "
+        + "(defaults to kafka-route-rules.xm)>] " +
+        "[-r <file with rules to route to kafka "
+        + "(defaults to kafka-route-rules.xml)>] " +
+        "[--kafkaproperties <Path to properties file that "
+        + "has the kafka connection properties>] " +
+        "[-f <Path to properties file that has the kafka "
+        + "connection properties>] " +
+        "[--peername name of hbase peer to use (defaults to hbasekafka)]  " +
+        "[-p name of hbase peer to use (defaults to hbasekafka)]  " +
+        "[--znode root znode (defaults to /kafkaproxy)]  " +
+        "[-z root znode (defaults to /kafkaproxy)]  " +
+
+        "[--enablepeer enable peer on startup (defaults to false)]  " +
+        "[-e enable peer on startup (defaults to false)]  " +
+
+        "[--auto auto create peer]  " +
+        "[-a auto create peer] \n", true);
+    System.exit(exitCode);
+  }
+
+  /**
+   * private constructor
+   */
+  private KafkaProxy() {
+
+  }
+
+  /**
+   * Start the service
+   * @param args program arguments
+   * @throws Exception on error
+   */
+  public static void main(String[] args) throws Exception {
+
+    Map<String,String> otherProps = new HashMap<>();
+
+    Options options = new Options();
+
+    options.addOption("b", "kafkabrokers", true,
+      "Kafka Brokers (comma delimited)");
+    options.addOption("r", "routerulesfile", true,
+      "file that has routing rules (defaults to conf/kafka-route-rules.xml");
+    options.addOption("f", "kafkaproperties", true,
+      "Path to properties file that has the kafka connection properties");
+    options.addOption("p", "peername", true,
+        "Name of hbase peer");
+    options.addOption("z", "znode", true,
+        "root zode to use in zookeeper (defaults to /kafkaproxy)");
+    options.addOption("a", "autopeer", false,
+        "Create a peer automatically to the hbase cluster");
+    options.addOption("e", "enablepeer", false,
+        "enable peer on startup (defaults to false)");
+
+
+    LOG.info("STARTING executorService " + HRegionServer.class.getSimpleName());
+    VersionInfo.logVersion();
+
+    Configuration conf = HBaseConfiguration.create();
+    CommandLine commandLine = null;
+    try {
+      commandLine = new BasicParser().parse(options, args);
+    } catch (ParseException e) {
+      LOG.error("Could not parse: ", e);
+      printUsageAndExit(options, -1);
+    }
+
+    String peer="";
+    if (!commandLine.hasOption('p')){
+      System.err.println("hbase peer id is required");
+      System.exit(-1);
+    } else {
+      peer = commandLine.getOptionValue('p');
+    }
+
+    boolean createPeer = false;
+    boolean enablePeer = false;
+
+    if (commandLine.hasOption('a')){
+      createPeer=true;
+    }
+
+    if (commandLine.hasOption("a")){
+      enablePeer=true;
+    }
+
+    String rulesFile = StringUtils.defaultIfBlank(
+            commandLine.getOptionValue("r"),"kafka-route-rules.xml");
+
+    if (!new File(rulesFile).exists()){
+      if (KafkaProxy.class.getClassLoader().getResource(rulesFile)!=null){
+        rulesFile = KafkaProxy.class.getClassLoader().getResource(rulesFile).getFile();
+      } else {
+        System.err.println("Rules file " + rulesFile +
+            " is invalid");
+        System.exit(-1);
+      }
+    }
+
+    otherProps.put(KafkaProxy.KAFKA_PROXY_RULES_FILE,rulesFile);
+
+    if (commandLine.hasOption('f')){
+      otherProps.put(KafkaProxy.KAFKA_PROXY_KAFKA_PROPERTIES,commandLine.getOptionValue('f'));
+    } else if (commandLine.hasOption('b')){
+      otherProps.put(KafkaProxy.KAFKA_PROXY_KAFKA_BROKERS,commandLine.getOptionValue('b'));
+    } else {
+      System.err.println("Kafka connection properites or brokers must be specified");
+      System.exit(-1);
+    }
+
+    String zookeeperQ = conf.get("hbase.zookeeper.quorum") + ":" +
+        conf.get("hbase.zookeeper.property.clientPort");
+
+    ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(20000, 20);
+
+    try (CuratorFramework zk = CuratorFrameworkFactory.newClient(zookeeperQ, retryPolicy);
+    ) {
+      zk.start();
+      String rootZnode = "/kafkaproxy";
+      setupZookeeperZnodes(zk,rootZnode,peer);
+      checkForOrCreateReplicationPeer(conf,zk,rootZnode,peer,createPeer,enablePeer);
+    }
+
+    @SuppressWarnings("unchecked")
+    Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf
+        .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class);
+
+    List<String> allArgs = DEFAULT_PROPERTIES.keySet().stream()
+        .map((argKey)->("-D"+argKey+"="+ DEFAULT_PROPERTIES.get(argKey)))
+        .collect(Collectors.toList());
+
+    otherProps.keySet().stream()
+        .map((argKey)->("-D"+argKey+"="+ otherProps.get(argKey)))
+        .forEach((item)->allArgs.add(item));
+
+    Arrays.stream(args)
+        .filter((arg)->(arg.startsWith("-D")||arg.equals("start")))
+        .forEach((arg)->allArgs.add(arg));
+
+    LOG.info("Args passed to region server "+allArgs);
+
+    String[] newArgs=new String[allArgs.size()];
+    allArgs.toArray(newArgs);
+
+    new HRegionServerCommandLine(regionServerClass).doMain(newArgs);
+  }
+
+
+  /**
+   * Set up the needed znodes under the rootZnode
+   * @param zk CuratorFramework framework instance
+   * @param rootZnode Root znode
+   * @throws Exception If an error occurs
+   */
+  public static void setupZookeeperZnodes(CuratorFramework zk, String rootZnode,String peer)
+          throws Exception {
+    // always gives the same uuid for the same name
+    UUID uuid = UUID.nameUUIDFromBytes(Bytes.toBytes(peer));
+    String newValue = uuid.toString();
+    byte []uuidBytes = Bytes.toBytes(newValue);
+    String idPath=rootZnode+"/hbaseid";
+    if (zk.checkExists().forPath(idPath) == null) {
+      zk.create().creatingParentsIfNeeded().forPath(rootZnode +
+          "/hbaseid",uuidBytes);
+    } else {
+      // If the znode is there already make sure it has the
+      // expected value for the peer name.
+      byte[] znodeBytes = zk.getData().forPath(idPath).clone();
+      if (!Bytes.equals(znodeBytes,uuidBytes)){
+        String oldValue = Bytes.toString(znodeBytes);
+        LOG.warn("znode "+idPath+" has unexpected value "+ oldValue
+            +" expecting " + newValue + " "
+            + " (did the peer name for the proxy change?) "
+            + "Updating value");
+        zk.setData().forPath(idPath, uuidBytes);
+      }
+    }
+  }
+
+  /**
+   * Poll for the configured peer or create it if it does not exist
+   *  (controlled by createIfMissing)
+   * @param hbaseConf the hbase configuratoin
+   * @param zk CuratorFramework object
+   * @param basePath base znode.
+   * @param peerName id if the peer to check for/create
+   * @param enablePeer if the peer is detected or created, enable it.
+   * @param createIfMissing if the peer doesn't exist, create it and peer to it.
+   */
+  public static void checkForOrCreateReplicationPeer(Configuration hbaseConf,
+                        CuratorFramework zk,
+                        String basePath,
+                        String peerName, boolean createIfMissing,
+                        boolean enablePeer) {
+    try (Connection conn = ConnectionFactory.createConnection(hbaseConf);
+       Admin admin = conn.getAdmin()) {
+
+      boolean peerThere = false;
+
+      while (!peerThere) {
+        try {
+          ReplicationPeerConfig peerConfig = admin.getReplicationPeerConfig(peerName);
+          if (peerConfig !=null) {
+            peerThere=true;
+          }
+        } catch (ReplicationPeerNotFoundException e) {
+          if (createIfMissing) {
+            ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
+            // get the current cluster's ZK config
+            String zookeeperQ = hbaseConf.get("hbase.zookeeper.quorum") +
+                ":" +
+                hbaseConf.get("hbase.zookeeper.property.clientPort");
+            String znodePath = zookeeperQ + ":"+basePath;
+            ReplicationPeerConfig rconf = builder.setClusterKey(znodePath).build();
+            admin.addReplicationPeer(peerName, rconf);
+            peerThere = true;
+          }
+        }
+
+        if (peerThere) {
+          if (enablePeer){
+            LOG.info("enable peer," + peerName);
+            admin.enableReplicationPeer(peerName);
+          }
+          break;
+        } else {
+          LOG.info("peer "+
+                  peerName+" not found, service will not completely start until the peer exists");
+        }
+        Thread.sleep(5000);
+      }
+
+      LOG.info("found replication peer " + peerName);
+
+    } catch (Exception e) {
+      LOG.error("Exception running proxy ",e);
+    }
+  }
+}
diff --git a/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java
new file mode 100644
index 0000000..ec8034d
--- /dev/null
+++ b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/KafkaTableForBridge.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.kafka;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
+
+
+@InterfaceAudience.Private
+public class KafkaTableForBridge implements Table {
+  private Logger LOG = LoggerFactory.getLogger(KafkaTableForBridge.class);
+
+  private final Configuration conf;
+  private final TableName tableName;
+  private byte[] tableAsBytes;
+
+  private Producer<byte[],byte[]> producer;
+  private TopicRoutingRules routingRules;
+
+  private DatumWriter<HBaseKafkaEvent> avroWriter;
+
+  private static final class CheckMutation {
+    byte[]qualifier;
+    byte[]family;
+    Cell cell;
+    List<String> topics = new ArrayList<>();
+  }
+
+  public KafkaTableForBridge(TableName tableName,
+                 Configuration conf,
+                 TopicRoutingRules routingRules,
+                 Producer<byte[],byte[]> producer,
+                 DatumWriter<HBaseKafkaEvent> avroWriter){
+    this.conf=conf;
+    this.tableName=tableName;
+    this.tableAsBytes=this.tableName.toBytes();
+    this.routingRules=routingRules;
+    this.producer=producer;
+    this.avroWriter=avroWriter;
+  }
+
+  private List<Pair<String,HBaseKafkaEvent>> processMutation(CheckMutation check, boolean isDelete){
+    HBaseKafkaEvent event = new HBaseKafkaEvent();
+    event.setKey(ByteBuffer.wrap(check.cell.getRowArray(),
+            check.cell.getRowOffset(),
+            check.cell.getRowLength()));
+    event.setTable(ByteBuffer.wrap(tableAsBytes));
+    event.setDelete(isDelete);
+    event.setTimestamp(check.cell.getTimestamp());
+    event.setFamily(ByteBuffer.wrap(check.family));
+    event.setQualifier(ByteBuffer.wrap(check.qualifier));
+    event.setValue(ByteBuffer.wrap(check.cell.getValueArray(),
+            check.cell.getValueOffset(),
+            check.cell.getValueLength()));
+
+    return check.topics.stream()
+        .map((topic)->new Pair<String,HBaseKafkaEvent>(topic,event))
+        .collect(Collectors.toList());
+  }
+
+  private boolean keep(CheckMutation ret){
+    if (!routingRules.isExclude(this.tableName,ret.family, ret.qualifier)){
+      return true;
+    }
+    return false;
+  }
+
+  private CheckMutation addTopics(CheckMutation ret){
+    ret.topics= routingRules.getTopics(this.tableName,ret.family,ret.qualifier);
+    return ret;
+  }
+
+  private ProducerRecord<byte[],byte[]> toByteArray(ByteArrayOutputStream bout,
+                                                    Pair<String,HBaseKafkaEvent> event,
+                                                    BinaryEncoder encoder) {
+    try {
+      bout.reset();
+      BinaryEncoder encoderUse = EncoderFactory.get().binaryEncoder(bout, encoder);
+      avroWriter.write(event.getSecond(), encoderUse);
+      encoder.flush();
+      return new ProducerRecord<byte[],byte[]>(event.getFirst(),
+              event.getSecond().getKey().array(),
+              bout.toByteArray());
+    } catch (Exception e){
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void batch(final List<? extends Row> actions, Object[] results)
+      throws IOException, InterruptedException {
+
+    ByteArrayOutputStream bout = new ByteArrayOutputStream();
+    BinaryEncoder encoderUse = EncoderFactory.get().binaryEncoder(bout, null);
+
+    LOG.debug("got {} inputs ",actions.size());
+
+    List<Future<RecordMetadata>> sends = new ArrayList<>();
+
+    actions.stream()
+      .filter((row)->row instanceof Mutation)
+      .map((row)->(Mutation)row)
+      .flatMap((row)->{
+        Mutation mut = (Mutation) row;
+        boolean isDelete = mut instanceof Delete;
+        return mut.getFamilyCellMap().keySet().stream()
+          .flatMap((family)->mut.getFamilyCellMap().get(family).stream())
+            .map((cell)->{
+              CheckMutation ret = new CheckMutation();
+              ret.family=CellUtil.cloneFamily(cell);
+              ret.qualifier=CellUtil.cloneQualifier(cell);
+              ret.cell=cell;
+              return ret;
+            })
+          .filter((check)->keep(check))
+          .map((check)->addTopics(check))
+          .filter((check)->!CollectionUtils.isEmpty(check.topics))
+          .flatMap((check)->processMutation(check,isDelete).stream());
+      })
+      .map((event)->toByteArray(bout,event,encoderUse))
+      .forEach((item)->sends.add(producer.send(item)));
+
+    // make sure the sends are done before returning
+    sends.stream().forEach((sendResult)->{
+      try {
+        sendResult.get();
+      } catch (Exception e){
+        LOG.error("Exception caught when getting result",e);
+        throw new RuntimeException(e);
+      }
+    });
+
+    this.producer.flush();
+  }
+
+  @Override
+  public TableName getName() {
+    return this.tableName;
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return this.conf;
+  }
+
+  @Override
+  public HTableDescriptor getTableDescriptor() throws IOException {
+    return null;
+  }
+
+  @Override
+  public TableDescriptor getDescriptor() throws IOException {
+    return null;
+  }
+
+}
diff --git a/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/Rule.java b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/Rule.java
new file mode 100644
index 0000000..7d02025
--- /dev/null
+++ b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/Rule.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.kafka;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+
+
+/**
+ * Implements the matching logic for a rule
+ */
+@InterfaceAudience.Private
+public abstract class Rule {
+  TableName tableName;
+  private byte [] columnFamily;
+  private byte [] qualifier;
+
+  boolean qualifierStartsWith = false;
+  boolean qualifierEndsWith = false;
+
+  byte []ast = Bytes.toBytes("*");
+
+  /**
+   * Indicates if the table,column family, and qualifier match the rule
+   * @param tryTable table name to test
+   * @param tryColumFamily column family to test
+   * @param tryQualifier qualifier to test
+   * @return true if values match the rule
+   */
+  public boolean match(TableName tryTable, byte [] tryColumFamily, byte [] tryQualifier) {
+    boolean tableMatch = tableMatch(tryTable);
+    boolean columnFamilyMatch = columnFamilyMatch(tryColumFamily);
+    boolean qualfierMatch = qualifierMatch(tryQualifier);
+
+    return tableMatch && columnFamilyMatch && qualfierMatch;
+  }
+
+  /**
+   * Test if the qualifier matches
+   * @param tryQualifier qualifier to test
+   * @return true if the qualifier matches
+   */
+  public boolean qualifierMatch(byte [] tryQualifier) {
+
+    if (qualifier != null) {
+      if (qualifierStartsWith && qualifierEndsWith) {
+        return (startsWith(tryQualifier, this.qualifier) || endsWith(tryQualifier, this.qualifier));
+      } else if (qualifierStartsWith) {
+        return startsWith(tryQualifier, this.qualifier);
+      } else if (qualifierEndsWith) {
+        return endsWith(tryQualifier, this.qualifier);
+      } else {
+        return Bytes.equals(this.qualifier, tryQualifier);
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Test if the column family matches the rule
+   * @param tryColumFamily column family to test
+   * @return true if the column family matches the rule
+   */
+  public boolean columnFamilyMatch(byte [] tryColumFamily) {
+    if (columnFamily != null) {
+      return Bytes.equals(this.columnFamily, tryColumFamily);
+    }
+    return true;
+  }
+
+  /**
+   * Test if the table matches the table in the rule
+   * @param tryTable table name to test
+   * @return true if the table matches the rule
+   */
+  public boolean tableMatch(TableName tryTable) {
+    if (tableName == null) {
+      return true;
+    }
+    return (tryTable.equals(this.tableName));
+  }
+
+  /**
+   * set the column family for the rule
+   * @param columnFamily column family to set
+   */
+  public void setColumnFamily(byte [] columnFamily) {
+    this.columnFamily = columnFamily;
+  }
+
+  /**
+   * set the qualifier value for the rule
+   * @param qualifier qualifier to set
+   */
+  public void setQualifier(byte []qualifier) {
+    this.qualifier = qualifier;
+    if (startsWith(qualifier, ast)) {
+      qualifierEndsWith = true;
+      this.qualifier = ArrayUtils.subarray(this.qualifier, ast.length, this.qualifier.length);
+    }
+    if (endsWith(qualifier, ast)) {
+      qualifierStartsWith = true;
+      this.qualifier = ArrayUtils.subarray(this.qualifier, 0, this.qualifier.length - ast.length);
+    }
+    if ((qualifierStartsWith) || (qualifierEndsWith)) {
+      if (this.qualifier.length == 0) {
+        this.qualifier = null;
+      }
+    }
+
+  }
+
+  /**
+   * Tests if data starts with startsWith
+   * @param data byte array to test
+   * @param startsWith array that we want to see if data starts with
+   * @return true if data starts with startsWith
+   */
+  public static boolean startsWith(byte [] data, byte [] startsWith) {
+    if (startsWith.length > data.length) {
+      return false;
+    }
+
+    if (startsWith.length == data.length) {
+      return Bytes.equals(data, startsWith);
+    }
+
+    for (int i = 0; i < startsWith.length; i++) {
+      if (startsWith[i] != data[i]) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Tests if data ends with endsWith
+   * @param data byte array to test
+   * @param endsWith array that we want to see if data ends with
+   * @return true if data ends with endsWith
+   */
+  public static boolean endsWith(byte [] data, byte [] endsWith) {
+    if (endsWith.length > data.length) {
+      return false;
+    }
+
+    if (endsWith.length == data.length) {
+      return Bytes.equals(data, endsWith);
+    }
+
+    int endStart = data.length - endsWith.length;
+
+    for (int i = 0; i < endsWith.length; i++) {
+      //if (endsWith[i]!=data[(data.length-1)-(endsWith.length+i)]){
+      if (endsWith[i] != data[endStart + i]) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * get the table for the rule
+   * @return tablename for ule
+   */
+  public TableName getTableName() {
+    return tableName;
+  }
+
+  /**
+   * set the table for the rule
+   * @param tableName to set
+   */
+  public void setTableName(TableName tableName) {
+    this.tableName = tableName;
+  }
+
+  /**
+   * get the column family for the rule
+   * @return column family
+   */
+  public byte[] getColumnFamily() {
+    return columnFamily;
+  }
+
+  /**
+   * get the qualifier for the rule
+   * @return qualfier
+   */
+  public byte[] getQualifier() {
+    return qualifier;
+  }
+
+
+  /**
+   * indicates if the qualfier is a wildcard like *foo
+   * @return true if rule is like *foo
+   */
+  public boolean isQualifierEndsWith() {
+    return qualifierEndsWith;
+  }
+
+  /**
+   * indicates if the qualfier is a wildcard like foo*
+   * @return true if rule is like foo*
+   */
+  public boolean isQualifierStartsWith() {
+    return qualifierStartsWith;
+
+  }
+}
diff --git a/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/TopicRoutingRules.java b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/TopicRoutingRules.java
new file mode 100644
index 0000000..c8b818c
--- /dev/null
+++ b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/TopicRoutingRules.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.kafka;
+
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+
+/**
+ * The topic routing/drop rules.
+ *
+ *  &lt;rules&gt;
+ *      &lt;rule .... /&gt;
+ *
+ *  &lt;/rules&gt;
+ *
+ *
+ *
+ * A wildcard can only be at the beginning or at the end (can be at both sides).
+ *
+ * drop rules are always evaluated first.
+ *
+ * drop examples:
+ * &lt;rule action="drop" table="default:MyTable" /&gt;
+ * Do not send replication events for table MyTable
+ *
+ * &lt;rule action="drop" table="default:MyTable" columnFamily="data"/&gt;
+ * Do not send replication events for table MyTable's column family data
+ *
+ * &lt;rule action="drop" table="default:MyTable" columnFamily="data" qualfier="dhold:*"/&gt;
+ * Do not send replication events for any qualiifier on table MyTable with column family data
+ *
+ * routeRules examples:
+ *
+ * &lt;rule action="routeRules" table="default:MyTable" topic="mytopic"/&gt;
+ * routeRules all replication events for table default:Mytable to topic mytopic
+ *
+ * &lt;rule action="routeRules" table="default:MyTable" columnFamily="data" topic="mytopic"/&gt;
+ * routeRules all replication events for table default:Mytable column family data to topic mytopic
+ *
+ * &lt;rule action="routeRules" table="default:MyTable" columnFamily="data" topic="mytopic"
+ *  qualifier="hold:*"/&gt;
+ * routeRules all replication events for qualifiers that start with hold: for table
+ *  default:Mytable column family data to topic mytopic
+ */
+@InterfaceAudience.Private
+public class TopicRoutingRules {
+
+  private List<DropRule> dropRules = new ArrayList<>();
+  private List<TopicRule> routeRules = new ArrayList<>();
+
+  private File sourceFile;
+
+  /**
+   * used for testing
+   */
+  public TopicRoutingRules() {
+
+  }
+
+  /**
+   * construct rule set from file
+   * @param source file that countains the rule set
+   * @throws Exception if load fails
+   */
+  public TopicRoutingRules(File source) throws Exception {
+    this.sourceFile = source;
+    this.reloadIfFile();
+  }
+
+  /**
+   * Reload the ruleset if it was parsed from a file
+   * @throws Exception error loading rule set
+   */
+  public void reloadIfFile() throws Exception {
+    if (this.sourceFile!=null){
+      List<DropRule> dropRulesSave = this.dropRules;
+      List<TopicRule> routeRulesSave = this.routeRules;
+
+      try (FileInputStream fin = new FileInputStream(this.sourceFile)) {
+        List<DropRule> dropRulesNew = new ArrayList<>();
+        List<TopicRule> routeRulesNew = new ArrayList<>();
+
+        parseRules(fin,dropRulesNew,routeRulesNew);
+
+        this.dropRules = dropRulesNew;
+        this.routeRules = routeRulesNew;
+
+      } catch (Exception e){
+        // roll back
+        this.dropRules=dropRulesSave;
+        this.routeRules=routeRulesSave;
+        // re-throw
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * parse rules manually from an input stream
+   * @param input InputStream that contains rule text
+   */
+  public void parseRules(InputStream input) {
+    List<DropRule> dropRulesNew = new ArrayList<>();
+    List<TopicRule> routeRulesNew = new ArrayList<>();
+    parseRules(input,dropRulesNew,routeRulesNew);
+    this.dropRules = dropRulesNew;
+    this.routeRules = routeRulesNew;
+  }
+
+  /**
+   * Parse the XML in the InputStream into route/drop rules and store them in the passed in Lists
+   * @param input inputstream the contains the ruleset
+   * @param dropRules list to accumulate drop rules
+   * @param routeRules list to accumulate route rules
+   */
+  public void parseRules(InputStream input,List<DropRule> dropRules, List<TopicRule> routeRules) {
+    try {
+      DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
+      DocumentBuilder dBuilder = dbFactory.newDocumentBuilder();
+      Document doc = dBuilder.parse(input);
+      NodeList nodList = doc.getElementsByTagName("rule");
+      for (int i = 0; i < nodList.getLength(); i++) {
+        if (nodList.item(i) instanceof Element) {
+          parseRule((Element) nodList.item(i),dropRules,routeRules);
+        }
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Parse a individual rule from a Element.
+   * @param n the element
+   * @param dropRules list to accumulate drop rules
+   * @param routeRules list to accumulate route rules
+   */
+  public void parseRule(Element n, List<DropRule> dropRules, List<TopicRule> routeRules) {
+    Rule r = null;
+    if (n.getAttribute("action").equals("drop")) {
+      r = new DropRule();
+      dropRules.add((DropRule) r);
+    } else {
+      r = new TopicRule(n.getAttribute("topic"));
+      routeRules.add((TopicRule) r);
+    }
+    if (n.hasAttribute("table")) {
+      r.setTableName(TableName.valueOf(n.getAttribute("table")));
+    }
+    if (n.hasAttribute("columnFamily")) {
+      r.setColumnFamily(Bytes.toBytes(n.getAttribute("columnFamily")));
+    }
+    if (n.hasAttribute("qualifier")) {
+      String qual = n.getAttribute("qualifier");
+      r.setQualifier(Bytes.toBytes(qual));
+    }
+  }
+
+  /**
+   * Indicates if a cell mutation should be dropped instead of routed to kafka.
+   * @param table table name to check
+   * @param columnFamily column family to check
+   * @param qualifer qualifier name to check
+   * @return if the mutation should be dropped instead of routed to Kafka
+   */
+  public boolean isExclude(final TableName table, final byte []columnFamily,
+                           final byte[] qualifer) {
+    for (DropRule r : getDropRules()) {
+      if (r.match(table, columnFamily, qualifer)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Get topics for the table/column family/qualifier combination
+   * @param table table name to check
+   * @param columnFamily column family to check
+   * @param qualifer qualifier name to check
+   * @return list of topics that match the passed in values (or empty for none).
+   */
+  public List<String> getTopics(TableName table, byte []columnFamily, byte []qualifer) {
+    List<String> ret = new ArrayList<>();
+    for (TopicRule r : getRouteRules()) {
+      if (r.match(table, columnFamily, qualifer)) {
+        ret.addAll(r.getTopics());
+      }
+    }
+
+    return ret;
+  }
+
+  /**
+   * returns all the drop rules (used for testing)
+   * @return drop rules
+   */
+  public List<DropRule> getDropRules() {
+    return dropRules;
+  }
+
+  /**
+   * returns all the route rules (used for testing)
+   * @return route rules
+   */
+  public List<TopicRule> getRouteRules() {
+    return routeRules;
+  }
+}
diff --git a/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/TopicRule.java b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/TopicRule.java
new file mode 100644
index 0000000..5e5b6bf
--- /dev/null
+++ b/hbase-kafka-proxy/src/main/java/org/apache/hadoop/hbase/kafka/TopicRule.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.kafka;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * If the Cell matches the rule returns the configured topics.
+ */
+@InterfaceAudience.Private
+public class TopicRule extends Rule {
+  private Set<String> topics = new HashSet<>();
+
+  public TopicRule(String topics) {
+    this.topics.addAll(Arrays.stream(topics.split(",")).collect(Collectors.toList()));
+  }
+
+  public Set<String> getTopics() {
+    return topics;
+  }
+}
diff --git a/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/ProducerForTesting.java b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/ProducerForTesting.java
new file mode 100644
index 0000000..df9fb6c
--- /dev/null
+++ b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/ProducerForTesting.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hbase.kafka;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ProducerFencedException;
+
+/**
+ * Mocks Kafka producer for testing
+ */
+public class ProducerForTesting implements Producer<byte[], byte[]> {
+  Map<String, List<HBaseKafkaEvent>> messages = new HashMap<>();
+  SpecificDatumReader<HBaseKafkaEvent> dreader = new SpecificDatumReader<>(HBaseKafkaEvent.SCHEMA$);
+
+  public Map<String, List<HBaseKafkaEvent>> getMessages() {
+    return messages;
+  }
+
+  @Override
+  public void abortTransaction() throws ProducerFencedException {
+  }
+
+  @Override
+  public Future<RecordMetadata> send(ProducerRecord<byte[], byte[]> producerRecord) {
+    try {
+
+      BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(producerRecord.value(), null);
+      HBaseKafkaEvent event = dreader.read(null, decoder);
+      if (!messages.containsKey(producerRecord.topic())) {
+        messages.put(producerRecord.topic(), new ArrayList<>());
+      }
+      messages.get(producerRecord.topic()).add(event);
+      return new Future<RecordMetadata>() {
+        @Override
+        public boolean cancel(boolean mayInterruptIfRunning) {
+          return false;
+        }
+
+        @Override
+        public boolean isCancelled() {
+          return false;
+        }
+
+        @Override
+        public boolean isDone() {
+          return false;
+        }
+
+        @Override
+        public RecordMetadata get() throws InterruptedException, ExecutionException {
+          return new RecordMetadata(null, 1, 1, 1, (long)1, 1, 1);
+        }
+
+        @Override
+        public RecordMetadata get(long timeout, TimeUnit unit)
+            throws InterruptedException, ExecutionException, TimeoutException {
+          return null;
+        }
+      };
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Future<RecordMetadata> send(ProducerRecord<byte[], byte[]> producerRecord,
+      Callback callback) {
+    return null;
+  }
+
+  @Override
+  public void flush() {
+  }
+
+  @Override
+  public List<PartitionInfo> partitionsFor(String s) {
+    return null;
+  }
+
+  @Override
+  public Map<MetricName, ? extends Metric> metrics() {
+    return null;
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public void close(long l, TimeUnit timeUnit) {
+  }
+
+  @Override
+  public void initTransactions() {
+  }
+
+  @Override
+  public void beginTransaction() throws ProducerFencedException {
+  }
+
+  @Override
+  public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
+      String consumerGroupId) throws ProducerFencedException {
+  }
+
+  @Override
+  public void commitTransaction() throws ProducerFencedException {
+  }
+}
diff --git a/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestDropRule.java b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestDropRule.java
new file mode 100644
index 0000000..ab5021c
--- /dev/null
+++ b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestDropRule.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hbase.kafka;
+
+import java.io.ByteArrayInputStream;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test different cases of drop rules.
+ */
+
+@Category(SmallTests.class)
+public class TestDropRule {
+  private static final String DROP_RULE1 =
+      "<rules><rule action=\"drop\" table=\"default:MyTable\" /></rules>";
+  private static final String DROP_RULE2 =
+      "<rules><rule action=\"drop\" table=\"default:MyTable\" "
+      + "columnFamily=\"data\"/></rules>";
+  private static final String DROP_RULE3 =
+      "<rules><rule action=\"drop\" table=\"default:MyTable\" "
+      + "columnFamily=\"data\" qualifier=\"dhold\"/></rules>";
+
+  private static final String DROP_RULE4 =
+      "<rules><rule action=\"drop\" table=\"default:MyTable\" "
+      + "columnFamily=\"data\" qualifier=\"dhold:*\"/></rules>";
+  private static final String DROP_RULE5 =
+      "<rules><rule action=\"drop\" table=\"default:MyTable\" "
+      + "columnFamily=\"data\" qualifier=\"*pickme\"/></rules>";
+
+  private static final String DROP_RULE6 =
+      "<rules><rule action=\"drop\" table=\"default:MyTable\" "
+      + "columnFamily=\"data\" qualifier=\"*pickme*\"/></rules>";
+
+  @Test
+  public void testDropies1() {
+    TopicRoutingRules rules = new TopicRoutingRules();
+    try {
+      rules.parseRules(new ByteArrayInputStream(DROP_RULE1.getBytes("UTF-8")));
+      Assert.assertEquals(1, rules.getDropRules().size());
+      Assert.assertEquals(TableName.valueOf("default:MyTable"),
+        rules.getDropRules().get(0).getTableName());
+      Assert.assertEquals(null, rules.getDropRules().get(0).getColumnFamily());
+      Assert.assertEquals(null, rules.getDropRules().get(0).getQualifier());
+      Assert.assertEquals(0, rules.getRouteRules().size());
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDropies2() {
+    TopicRoutingRules rules = new TopicRoutingRules();
+    try {
+      rules.parseRules(new ByteArrayInputStream(DROP_RULE2.getBytes("UTF-8")));
+      Assert.assertEquals(1, rules.getDropRules().size());
+      Assert.assertEquals(TableName.valueOf("default:MyTable"),
+        rules.getDropRules().get(0).getTableName());
+      Assert.assertTrue(
+        Bytes.equals("data".getBytes("UTF-8"), rules.getDropRules().get(0).getColumnFamily()));
+      Assert.assertEquals(null, rules.getDropRules().get(0).getQualifier());
+      Assert.assertEquals(0, rules.getRouteRules().size());
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDropies3() {
+    TopicRoutingRules rules = new TopicRoutingRules();
+    try {
+      rules.parseRules(new ByteArrayInputStream(DROP_RULE3.getBytes("UTF-8")));
+      Assert.assertEquals(1, rules.getDropRules().size());
+      Assert.assertEquals(TableName.valueOf("default:MyTable"),
+        rules.getDropRules().get(0).getTableName());
+      Assert.assertTrue(
+        Bytes.equals("data".getBytes("UTF-8"), rules.getDropRules().get(0).getColumnFamily()));
+      Assert
+          .assertTrue(Bytes.equals(
+                  "dhold".getBytes("UTF-8"), rules.getDropRules().get(0).getQualifier()));
+      Assert.assertEquals(0, rules.getRouteRules().size());
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDropies4() {
+    TopicRoutingRules rules = new TopicRoutingRules();
+    try {
+      rules.parseRules(new ByteArrayInputStream(DROP_RULE4.getBytes("UTF-8")));
+      Assert.assertEquals(1, rules.getDropRules().size());
+      Assert.assertEquals(TableName.valueOf("default:MyTable"),
+        rules.getDropRules().get(0).getTableName());
+      Assert.assertTrue(
+        Bytes.equals("data".getBytes("UTF-8"), rules.getDropRules().get(0).getColumnFamily()));
+      Assert.assertTrue(
+        Bytes.equals("dhold:".getBytes("UTF-8"), rules.getDropRules().get(0).getQualifier()));
+      Assert.assertEquals(0, rules.getRouteRules().size());
+
+      DropRule drop = rules.getDropRules().get(0);
+      Assert.assertFalse(
+        drop.match(TableName.valueOf("default:MyTable"),
+                "data".getBytes("UTF-8"),
+                "blah".getBytes("UTF-8")));
+      Assert.assertFalse(
+        drop.match(TableName.valueOf("default:MyTable"),
+                "data".getBytes("UTF-8"),
+                "dholdme".getBytes("UTF-8")));
+      Assert.assertTrue(
+        drop.match(TableName.valueOf("default:MyTable"),
+                "data".getBytes("UTF-8"),
+                "dhold:me".getBytes("UTF-8")));
+
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDropies5() {
+    TopicRoutingRules rules = new TopicRoutingRules();
+    try {
+      rules.parseRules(new ByteArrayInputStream(DROP_RULE5.getBytes("UTF-8")));
+      Assert.assertEquals(1, rules.getDropRules().size());
+      Assert.assertEquals(TableName.valueOf("default:MyTable"),
+        rules.getDropRules().get(0).getTableName());
+      Assert.assertTrue(
+        Bytes.equals("data".getBytes("UTF-8"), rules.getDropRules().get(0).getColumnFamily()));
+      Assert.assertTrue(
+        Bytes.equals("pickme".getBytes("UTF-8"), rules.getDropRules().get(0).getQualifier()));
+      Assert.assertEquals(0, rules.getRouteRules().size());
+
+      DropRule drop = rules.getDropRules().get(0);
+      Assert.assertFalse(
+        drop.match(TableName.valueOf("default:MyTable"),
+                "data".getBytes("UTF-8"),
+                "blah".getBytes("UTF-8")));
+      Assert.assertFalse(drop.match(TableName.valueOf("default:MyTable"),
+              "data".getBytes("UTF-8"),
+              "blacickme".getBytes("UTF-8")));
+      Assert.assertTrue(drop.match(TableName.valueOf("default:MyTable"),
+              "data".getBytes("UTF-8"),
+              "hithere.pickme".getBytes("UTF-8")));
+
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testDropies6() {
+    TopicRoutingRules rules = new TopicRoutingRules();
+    try {
+      rules.parseRules(new ByteArrayInputStream(DROP_RULE6.getBytes("UTF-8")));
+      Assert.assertEquals(1, rules.getDropRules().size());
+      Assert.assertEquals(TableName.valueOf("default:MyTable"),
+        rules.getDropRules().get(0).getTableName());
+      Assert.assertTrue(
+        Bytes.equals("data".getBytes("UTF-8"), rules.getDropRules().get(0).getColumnFamily()));
+      Assert.assertTrue(
+        Bytes.equals("pickme".getBytes("UTF-8"), rules.getDropRules().get(0).getQualifier()));
+      Assert.assertEquals(0, rules.getRouteRules().size());
+
+      DropRule drop = rules.getDropRules().get(0);
+      Assert.assertFalse(
+        drop.match(TableName.valueOf("default:MyTable"),
+                "data".getBytes("UTF-8"),
+                "blah".getBytes("UTF-8")));
+      Assert.assertFalse(drop.match(TableName.valueOf("default:MyTable"),
+              "data".getBytes("UTF-8"),
+              "blacickme".getBytes("UTF-8")));
+      Assert.assertTrue(drop.match(TableName.valueOf("default:MyTable"),
+              "data".getBytes("UTF-8"),
+              "hithere.pickme".getBytes("UTF-8")));
+      Assert.assertTrue(drop.match(TableName.valueOf("default:MyTable"),
+              "data".getBytes("UTF-8"),
+              "pickme.pleaze.do.it".getBytes("UTF-8")));
+      Assert.assertFalse(drop.match(TableName.valueOf("default:MyTable"),
+              "data".getBytes("UTF-8"),
+              "please.pickme.pleaze".getBytes("UTF-8")));
+      Assert.assertTrue(drop.match(TableName.valueOf("default:MyTable"),
+              "data".getBytes("UTF-8"),
+              "pickme.pleaze.pickme".getBytes("UTF-8")));
+
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+  }
+
+}
diff --git a/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestProcessMutations.java b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestProcessMutations.java
new file mode 100644
index 0000000..a474cdc
--- /dev/null
+++ b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestProcessMutations.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.kafka;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Row;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+
+/**
+ * Test that mutations are getting published to the topic
+ */
+@Category(SmallTests.class)
+public class TestProcessMutations {
+  private User user = new User() {
+    @Override
+    public String getShortName() {
+      return "my name";
+    }
+
+    @Override
+    public <T> T runAs(PrivilegedAction<T> action) {
+      return null;
+    }
+
+    @Override
+    public <T> T runAs(PrivilegedExceptionAction<T> action)
+            throws IOException, InterruptedException {
+      return null;
+    }
+  };
+
+  private static final String ROUTE_RULE1 =
+      "<rules><rule action=\"route\" table=\"MyNamespace:MyTable\" "
+          + "topic=\"foo\"/></rules>";
+
+  ProducerForTesting myTestingProducer;
+
+  @Before
+  public void setup() {
+    this.myTestingProducer=new ProducerForTesting();
+  }
+
+  @After
+  public void tearDown() {
+
+  }
+
+  @Test
+  public void testSendMessage() {
+    TopicRoutingRules rules = new TopicRoutingRules();
+    try {
+
+      //Configuration conf, ExecutorService pool, User user,
+      //                 TopicRoutingRules routingRules,Producer<byte[],byte[]> producer
+
+      rules.parseRules(new ByteArrayInputStream(ROUTE_RULE1.getBytes("UTF-8")));
+      Configuration conf = new Configuration();
+      KafkaBridgeConnection connection =
+          new KafkaBridgeConnection(
+                  conf,Executors.newSingleThreadExecutor(),user,rules,myTestingProducer);
+      long zeTimestamp = System.currentTimeMillis();
+      Put put = new Put("key1".getBytes("UTF-8"),zeTimestamp);
+      put.addColumn("FAMILY".getBytes("UTF-8"),
+              "not foo".getBytes("UTF-8"),
+              "VALUE should NOT pass".getBytes("UTF-8"));
+      put.addColumn("FAMILY".getBytes("UTF-8"),
+              "foo".getBytes("UTF-8"),
+              "VALUE should pass".getBytes("UTF-8"));
+      Table myTable = connection.getTable(TableName.valueOf("MyNamespace:MyTable"));
+      List<Row> rows = new ArrayList<>();
+      rows.add(put);
+      myTable.batch(rows,new Object[0]);
+
+      Assert.assertEquals(false,myTestingProducer.getMessages().isEmpty());
+
+    } catch (Exception e){
+      Assert.fail(e.getMessage());
+    }
+  }
+
+}
diff --git a/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestQualifierMatching.java b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestQualifierMatching.java
new file mode 100644
index 0000000..3e9ff36
--- /dev/null
+++ b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestQualifierMatching.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hbase.kafka;
+
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Make sure match rules work
+ */
+@Category(SmallTests.class)
+public class TestQualifierMatching {
+
+  @Test
+  public void testMatchQualfier() throws Exception {
+    DropRule rule = new DropRule();
+    rule.setQualifier("data".getBytes("UTF-8"));
+    Assert.assertTrue(rule.qualifierMatch("data".getBytes("UTF-8")));
+
+    rule = new DropRule();
+    rule.setQualifier("data1".getBytes("UTF-8"));
+    Assert.assertFalse(rule.qualifierMatch("data".getBytes("UTF-8")));
+
+    // if not set, it is a wildcard
+    rule = new DropRule();
+    Assert.assertTrue(rule.qualifierMatch("data".getBytes("UTF-8")));
+  }
+
+  @Test
+  public void testStartWithQualifier() throws  Exception{
+    DropRule rule = new DropRule();
+    rule.setQualifier("data*".getBytes("UTF-8"));
+    Assert.assertTrue(rule.isQualifierStartsWith());
+    Assert.assertFalse(rule.isQualifierEndsWith());
+
+    Assert.assertTrue(rule.qualifierMatch("data".getBytes("UTF-8")));
+    Assert.assertTrue(rule.qualifierMatch("data1".getBytes("UTF-8")));
+    Assert.assertTrue(rule.qualifierMatch("datafoobar".getBytes("UTF-8")));
+    Assert.assertFalse(rule.qualifierMatch("datfoobar".getBytes("UTF-8")));
+    Assert.assertFalse(rule.qualifierMatch("d".getBytes("UTF-8")));
+    Assert.assertFalse(rule.qualifierMatch("".getBytes("UTF-8")));
+  }
+
+  @Test
+  public void testEndsWithQualifier() throws Exception {
+    DropRule rule = new DropRule();
+    rule.setQualifier("*data".getBytes("UTF-8"));
+    Assert.assertFalse(rule.isQualifierStartsWith());
+    Assert.assertTrue(rule.isQualifierEndsWith());
+
+    Assert.assertTrue(rule.qualifierMatch("data".getBytes("UTF-8")));
+    Assert.assertTrue(rule.qualifierMatch("1data".getBytes("UTF-8")));
+    Assert.assertTrue(rule.qualifierMatch("foobardata".getBytes("UTF-8")));
+    Assert.assertFalse(rule.qualifierMatch("foobardat".getBytes("UTF-8")));
+    Assert.assertFalse(rule.qualifierMatch("d".getBytes("UTF-8")));
+    Assert.assertFalse(rule.qualifierMatch("".getBytes("UTF-8")));
+  }
+
+}
diff --git a/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestRouteRules.java b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestRouteRules.java
new file mode 100644
index 0000000..5d387c1
--- /dev/null
+++ b/hbase-kafka-proxy/src/test/java/org/apache/hadoop/hbase/kafka/TestRouteRules.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hbase.kafka;
+
+import java.io.ByteArrayInputStream;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test drop rules
+ */
+@Category(SmallTests.class)
+public class TestRouteRules {
+  private static final String ROUTE_RULE1 =
+      "<rules><rule action=\"route\" table=\"default:MyTable\" "
+      + "topic=\"foo\"/></rules>";
+  private static final String ROUTE_RULE2 =
+      "<rules><rule action=\"route\" table=\"default:MyTable\" "
+      + "columnFamily=\"data\" topic=\"foo\"/></rules>";
+  private static final String ROUTE_RULE3 =
+      "<rules><rule action=\"route\" table=\"default:MyTable\" "
+      + "columnFamily=\"data\" qualifier=\"dhold\" topic=\"foo\"/></rules>";
+
+  private static final String ROUTE_RULE4 =
+      "<rules><rule action=\"route\" table=\"default:MyTable\" "
+      + "columnFamily=\"data\" qualifier=\"dhold:*\" topic=\"foo\" /></rules>";
+  private static final String ROUTE_RULE5 =
+      "<rules><rule action=\"route\" table=\"default:MyTable\" "
+      + "columnFamily=\"data\" qualifier=\"*pickme\" topic=\"foo\" /></rules>";
+
+  private static final String ROUTE_RULE6 =
+      "<rules><rule action=\"route\" table=\"default:MyTable\" "
+      + "columnFamily=\"data\" qualifier=\"*pickme*\" topic=\"foo\"  /></rules>";
+
+  @Test
+  public void testTopic1() {
+    TopicRoutingRules rules = new TopicRoutingRules();
+    try {
+      rules.parseRules(new ByteArrayInputStream(ROUTE_RULE1.getBytes("UTF-8")));
+      Assert.assertEquals(1, rules.getRouteRules().size());
+      Assert.assertEquals(TableName.valueOf("default:MyTable"),
+        rules.getRouteRules().get(0).getTableName());
+      Assert.assertEquals(null, rules.getRouteRules().get(0).getColumnFamily());
+      Assert.assertEquals(null, rules.getRouteRules().get(0).getQualifier());
+      Assert.assertEquals(0, rules.getDropRules().size());
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testTopic2() {
+    TopicRoutingRules rules = new TopicRoutingRules();
+    try {
+      rules.parseRules(new ByteArrayInputStream(ROUTE_RULE2.getBytes("UTF-8")));
+      Assert.assertEquals(1, rules.getRouteRules().size());
+      Assert.assertEquals(TableName.valueOf("default:MyTable"),
+        rules.getRouteRules().get(0).getTableName());
+      Assert.assertTrue(
+        Bytes.equals("data".getBytes("UTF-8"), rules.getRouteRules().get(0).getColumnFamily()));
+      Assert.assertEquals(null, rules.getRouteRules().get(0).getQualifier());
+      Assert.assertEquals(0, rules.getDropRules().size());
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testTopic3() {
+    TopicRoutingRules rules = new TopicRoutingRules();
+    try {
+      rules.parseRules(new ByteArrayInputStream(ROUTE_RULE3.getBytes("UTF-8")));
+      Assert.assertEquals(1, rules.getRouteRules().size());
+      Assert.assertEquals(TableName.valueOf("default:MyTable"),
+        rules.getRouteRules().get(0).getTableName());
+      Assert.assertTrue(
+        Bytes.equals("data".getBytes("UTF-8"), rules.getRouteRules().get(0).getColumnFamily()));
+      Assert.assertTrue(
+        Bytes.equals("dhold".getBytes("UTF-8"), rules.getRouteRules().get(0).getQualifier()));
+      Assert.assertTrue(rules.getRouteRules().get(0).getTopics().contains("foo"));
+      Assert.assertEquals(rules.getRouteRules().get(0).getTopics().size(), 1);
+
+      Assert.assertEquals(0, rules.getDropRules().size());
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testTopic4() {
+    TopicRoutingRules rules = new TopicRoutingRules();
+    try {
+      rules.parseRules(new ByteArrayInputStream(ROUTE_RULE4.getBytes("UTF-8")));
+      Assert.assertEquals(1, rules.getRouteRules().size());
+      Assert.assertEquals(TableName.valueOf("default:MyTable"),
+        rules.getRouteRules().get(0).getTableName());
+      Assert.assertTrue(
+        Bytes.equals("data".getBytes("UTF-8"), rules.getRouteRules().get(0).getColumnFamily()));
+      Assert.assertTrue(
+        Bytes.equals("dhold:".getBytes("UTF-8"), rules.getRouteRules().get(0).getQualifier()));
+      Assert.assertEquals(0, rules.getDropRules().size());
+
+      TopicRule route = rules.getRouteRules().get(0);
+      Assert.assertFalse(
+        route.match(TableName.valueOf("default:MyTable"),
+                "data".getBytes("UTF-8"),
+                "blah".getBytes("UTF-8")));
+      Assert.assertFalse(
+        route.match(TableName.valueOf("default:MyTable"),
+                "data".getBytes("UTF-8"),
+                "dholdme".getBytes("UTF-8")));
+      Assert.assertTrue(route.match(TableName.valueOf("default:MyTable"),
+              "data".getBytes("UTF-8"),
+              "dhold:me".getBytes("UTF-8")));
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testTopic5() {
+    TopicRoutingRules rules = new TopicRoutingRules();
+    try {
+      rules.parseRules(new ByteArrayInputStream(ROUTE_RULE5.getBytes("UTF-8")));
+      Assert.assertEquals(1, rules.getRouteRules().size());
+      Assert.assertEquals(TableName.valueOf("default:MyTable"),
+        rules.getRouteRules().get(0).getTableName());
+      Assert.assertTrue(
+        Bytes.equals("data".getBytes("UTF-8"), rules.getRouteRules().get(0).getColumnFamily()));
+      Assert.assertTrue(
+        Bytes.equals("pickme".getBytes("UTF-8"), rules.getRouteRules().get(0).getQualifier()));
+      Assert.assertEquals(0, rules.getDropRules().size());
+
+      TopicRule route = rules.getRouteRules().get(0);
+      Assert.assertFalse(
+        route.match(TableName.valueOf("default:MyTable"),
+                "data".getBytes("UTF-8"),
+                "blah".getBytes("UTF-8")));
+      Assert.assertFalse(route.match(TableName.valueOf("default:MyTable"),
+              "data".getBytes("UTF-8"),
+              "blacickme".getBytes("UTF-8")));
+      Assert.assertTrue(route.match(TableName.valueOf("default:MyTable"),
+              "data".getBytes("UTF-8"),
+              "hithere.pickme".getBytes("UTF-8")));
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testTopic6() {
+    TopicRoutingRules rules = new TopicRoutingRules();
+    try {
+      rules.parseRules(new ByteArrayInputStream(ROUTE_RULE6.getBytes("UTF-8")));
+      Assert.assertEquals(1, rules.getRouteRules().size());
+      Assert.assertEquals(TableName.valueOf("default:MyTable"),
+        rules.getRouteRules().get(0).getTableName());
+      Assert.assertTrue(
+        Bytes.equals("data".getBytes("UTF-8"),
+                rules.getRouteRules().get(0).getColumnFamily()));
+      Assert.assertTrue(
+        Bytes.equals("pickme".getBytes("UTF-8"),
+                rules.getRouteRules().get(0).getQualifier()));
+      Assert.assertEquals(0, rules.getDropRules().size());
+
+      TopicRule route = rules.getRouteRules().get(0);
+      Assert.assertFalse(
+        route.match(TableName.valueOf("default:MyTable"),
+                "data".getBytes("UTF-8"),
+                "blah".getBytes("UTF-8")));
+      Assert.assertFalse(route.match(TableName.valueOf("default:MyTable"),
+              "data".getBytes("UTF-8"),
+        "blacickme".getBytes("UTF-8")));
+      Assert.assertTrue(route.match(TableName.valueOf("default:MyTable"),
+              "data".getBytes("UTF-8"),
+        "hithere.pickme".getBytes("UTF-8")));
+      Assert.assertTrue(route.match(TableName.valueOf("default:MyTable"),
+              "data".getBytes("UTF-8"),
+        "pickme.pleaze.do.it".getBytes("UTF-8")));
+      Assert.assertFalse(route.match(TableName.valueOf("default:MyTable"),
+              "data".getBytes("UTF-8"),
+        "please.pickme.pleaze".getBytes("UTF-8")));
+      Assert.assertTrue(route.match(TableName.valueOf("default:MyTable"),
+              "data".getBytes("UTF-8"),
+        "pickme.pleaze.pickme".getBytes("UTF-8")));
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+  }
+
+}
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..f487f47
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,404 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0   http://maven.apache.org/maven-v4_0_0.xsd">
+  <!--
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+  ON MVN COMPILE NOT WORKING
+
+  If you wondering why 'mvn compile' does not work building HBase
+  (in particular, if you are doing it for the first time), instead do
+  'mvn package'.  If you are interested in the full story, see
+  https://issues.apache.org/jira/browse/HBASE-6795.
+
+-->
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache</groupId>
+    <artifactId>apache</artifactId>
+    <version>18</version>
+    <relativePath/>
+    <!-- no parent resolution -->
+  </parent>
+  <groupId>org.apache.hbase</groupId>
+  <artifactId>hbase-connectors</artifactId>
+  <version>1.0.0-SNAPSHOT</version>
+  <name>Apache HBase Connectors</name>
+  <packaging>pom</packaging>
+  <description>
+    Connectors to Apache HBase.
+  </description>
+  <url>http://hbase.apache.org</url>
+  <inceptionYear>2018</inceptionYear>
+  <licenses>
+    <license>
+      <name>Apache License, Version 2.0</name>
+      <url>https://www.apache.org/licenses/LICENSE-2.0.txt</url>
+      <distribution>repo</distribution>
+    </license>
+  </licenses>
+  <modules>
+    <module>hbase-kafka-model</module>
+    <module>hbase-kafka-proxy</module>
+  </modules>
+  <scm>
+    <connection>scm:git:git://gitbox.apache.org/repos/asf/hbase-connectors.git</connection>
+    <developerConnection>scm:git:https://gitbox.apache.org/repos/asf/hbase-connectors.git</developerConnection>
+    <url>https://gitbox.apache.org/repos/asf?p=hbase-connectors.git;a=summary</url>
+  </scm>
+  <issueManagement>
+    <!--File issues for this project against Apache HBase JIRA-->
+    <system>JIRA</system>
+    <url>http://issues.apache.org/jira/browse/HBASE</url>
+  </issueManagement>
+  <ciManagement>
+    <system>hudson</system>
+    <url>http://hudson.zones.apache.org/hudson/view/HBase/job/HBase-TRUNK/</url>
+  </ciManagement>
+  <mailingLists>
+    <mailingList>
+      <name>User List</name>
+      <subscribe>user-subscribe@hbase.apache.org</subscribe>
+      <unsubscribe>user-unsubscribe@hbase.apache.org</unsubscribe>
+      <post>user@hbase.apache.org</post>
+      <archive>http://mail-archives.apache.org/mod_mbox/hbase-user/</archive>
+      <otherArchives>
+        <otherArchive>http://dir.gmane.org/gmane.comp.java.hadoop.hbase.user</otherArchive>
+        <otherArchive>http://search-hadoop.com/?q=&amp;fc_project=HBase</otherArchive>
+      </otherArchives>
+    </mailingList>
+    <mailingList>
+      <name>Developer List</name>
+      <subscribe>dev-subscribe@hbase.apache.org</subscribe>
+      <unsubscribe>dev-unsubscribe@hbase.apache.org</unsubscribe>
+      <post>dev@hbase.apache.org</post>
+      <archive>http://mail-archives.apache.org/mod_mbox/hbase-dev/</archive>
+      <otherArchives>
+        <otherArchive>http://dir.gmane.org/gmane.comp.java.hadoop.hbase.devel</otherArchive>
+        <otherArchive>http://search-hadoop.com/?q=&amp;fc_project=HBase</otherArchive>
+      </otherArchives>
+    </mailingList>
+    <mailingList>
+      <name>Commits List</name>
+      <subscribe>commits-subscribe@hbase.apache.org</subscribe>
+      <unsubscribe>commits-unsubscribe@hbase.apache.org</unsubscribe>
+      <archive>http://mail-archives.apache.org/mod_mbox/hbase-commits/</archive>
+    </mailingList>
+    <mailingList>
+      <name>Issues List</name>
+      <subscribe>issues-subscribe@hbase.apache.org</subscribe>
+      <unsubscribe>issues-unsubscribe@hbase.apache.org</unsubscribe>
+      <archive>http://mail-archives.apache.org/mod_mbox/hbase-issues/</archive>
+    </mailingList>
+    <mailingList>
+      <name>Builds List</name>
+      <subscribe>builds-subscribe@hbase.apache.org</subscribe>
+      <unsubscribe>builds-unsubscribe@hbase.apache.org</unsubscribe>
+      <archive>http://mail-archives.apache.org/mod_mbox/hbase-builds/</archive>
+    </mailingList>
+  </mailingLists>
+  <developers/>
+  <!--TODO-->
+  <properties>
+    <maven.javadoc.skip>true</maven.javadoc.skip>
+    <maven.build.timestamp.format>
+      yyyy-MM-dd'T'HH:mm
+    </maven.build.timestamp.format>
+    <buildDate>${maven.build.timestamp}</buildDate>
+    <compileSource>1.8</compileSource>
+    <java.min.version>${compileSource}</java.min.version>
+    <maven.min.version>3.3.3</maven.min.version>
+    <rename.offset>org.apache.hadoop.hbase.shaded</rename.offset>
+    <avro.version>1.7.7</avro.version>
+    <hbase.version>2.1.0</hbase.version>
+    <maven.compiler.version>3.6.1</maven.compiler.version>
+  </properties>
+  <dependencyManagement>
+    <dependencies>
+      <!-- Avro dependencies we mostly get transitively, manual version coallescing -->
+      <dependency>
+        <groupId>org.apache.avro</groupId>
+        <artifactId>avro</artifactId>
+        <version>${avro.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.hbase</groupId>
+        <artifactId>hbase-common</artifactId>
+        <version>${hbase.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.hbase</groupId>
+        <artifactId>hbase-annotations</artifactId>
+        <version>${hbase.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.hbase</groupId>
+        <artifactId>hbase-annotations</artifactId>
+        <version>${hbase.version}</version>
+        <type>test-jar</type>
+        <scope>test</scope>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.hbase</groupId>
+        <artifactId>hbase-common</artifactId>
+        <version>${hbase.version}</version>
+        <type>test-jar</type>
+        <scope>test</scope>
+      </dependency>
+      <dependency>
+        <artifactId>hbase-zookeeper</artifactId>
+        <groupId>org.apache.hbase</groupId>
+        <version>${hbase.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.hbase</groupId>
+        <artifactId>hbase-server</artifactId>
+        <version>${hbase.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.hbase</groupId>
+        <artifactId>hbase-client</artifactId>
+        <version>${hbase.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.hbase.connectors</groupId>
+        <artifactId>hbase-kafka-model</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+    </dependencies>
+  </dependencyManagement>
+  <build>
+    <pluginManagement>
+      <plugins>
+        <plugin>
+          <groupId>org.apache.maven.plugins</groupId>
+          <artifactId>maven-release-plugin</artifactId>
+          <configuration>
+            <!--You need this profile. It'll sign your artifacts.
+              I'm not sure if this config. actually works though.
+              I've been specifying -Papache-release on the command-line
+           -->
+            <releaseProfiles>apache-release</releaseProfiles>
+            <!--This stops our running tests for each stage of maven release.
+              But it builds the test jar.  From SUREFIRE-172.
+            -->
+            <arguments>-Dmaven.test.skip.exec ${arguments}</arguments>
+            <goals>${goals}</goals>
+            <pomFileName>pom.xml</pomFileName>
+          </configuration>
+        </plugin>
+        <plugin>
+          <groupId>org.apache.maven.plugins</groupId>
+          <artifactId>maven-compiler-plugin</artifactId>
+          <version>${maven.compiler.version}</version>
+          <configuration>
+            <source>${compileSource}</source>
+            <target>${compileSource}</target>
+            <showWarnings>true</showWarnings>
+            <showDeprecation>false</showDeprecation>
+            <useIncrementalCompilation>false</useIncrementalCompilation>
+            <compilerArgument>-Xlint:-options</compilerArgument>
+          </configuration>
+        </plugin>
+        <plugin>
+          <groupId>org.codehaus.mojo</groupId>
+          <artifactId>build-helper-maven-plugin</artifactId>
+          <version>1.9.1</version>
+        </plugin>
+        <plugin>
+          <groupId>org.apache.rat</groupId>
+          <artifactId>apache-rat-plugin</artifactId>
+          <version>0.11</version>
+          <configuration>
+            <excludes>
+              <exclude>**/*.versionsBackup</exclude>
+              <exclude>**/*.log</exclude>
+              <exclude>**/.*</exclude>
+              <exclude>**/*.tgz</exclude>
+              <exclude>**/*.orig</exclude>
+              <exclude>**/.idea/**</exclude>
+              <exclude>**/*.iml</exclude>
+              <exclude>**/CHANGES.txt</exclude>
+              <exclude>**/generated/**</exclude>
+              <exclude>**/gen-*/**</exclude>
+              <exclude>**/*.avpr</exclude>
+              <exclude>**/*.svg</exclude>
+              <!-- vector graphics -->
+              <exclude>**/*.vm</exclude>
+              <!-- apache doxia generated -->
+              <exclude>**/control</exclude>
+              <exclude>**/conffile</exclude>
+              <!-- auto-gen docs -->
+              <exclude>docs/*</exclude>
+              <exclude>logs/*</exclude>
+              <!--  exclude source control files -->
+              <exclude>.git/**</exclude>
+              <exclude>.svn/**</exclude>
+              <exclude>**/.settings/**</exclude>
+              <exclude>**/patchprocess/**</exclude>
+              <exclude>**/dependency-reduced-pom.xml</exclude>
+              <exclude>**/rat.txt</exclude>
+              <!-- exclude the shaded protobuf files -->
+              <exclude>**/src/main/patches/**</exclude>
+              <!--Exclude the unpacked google src-->
+              <exclude>**/src/main/java/com/google/protobuf/**</exclude>
+              <exclude>**/src/main/java/google/**</exclude>
+              <exclude>**/src/main/java/META-INF/**</exclude>
+            </excludes>
+          </configuration>
+        </plugin>
+        <plugin>
+          <groupId>org.codehaus.mojo</groupId>
+          <artifactId>buildnumber-maven-plugin</artifactId>
+          <version>1.4</version>
+          <executions>
+            <execution>
+              <phase>validate</phase>
+              <goals>
+                <goal>create-timestamp</goal>
+              </goals>
+            </execution>
+          </executions>
+          <configuration>
+            <timestampFormat>yyyy</timestampFormat>
+            <timestampPropertyName>build.year</timestampPropertyName>
+          </configuration>
+        </plugin>
+        <plugin>
+          <groupId>org.apache.maven.plugins</groupId>
+          <artifactId>maven-shade-plugin</artifactId>
+          <version>3.0.0</version>
+        </plugin>
+      </plugins>
+    </pluginManagement>
+    <plugins>
+      <plugin>
+        <artifactId>maven-clean-plugin</artifactId>
+        <configuration>
+          <filesets>
+            <fileset>
+              <directory>${basedir}</directory>
+              <includes>
+                <include>dependency-reduced-pom.xml</include>
+              </includes>
+            </fileset>
+          </filesets>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-enforcer-plugin</artifactId>
+        <version>1.4</version>
+        <dependencies>
+          <dependency>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>extra-enforcer-rules</artifactId>
+            <version>1.0-beta-6</version>
+          </dependency>
+        </dependencies>
+        <!-- version set by parent -->
+        <executions>
+          <execution>
+            <id>min-maven-min-java-banned-xerces</id>
+            <goals>
+              <goal>enforce</goal>
+            </goals>
+            <configuration>
+              <rules>
+                <!-- The earliest maven version we verify builds for via ASF Jenkins -->
+                <requireMavenVersion>
+                  <version>[${maven.min.version},)</version>
+                  <message>Maven is out of date.
+  HBase requires at least version ${maven.min.version} of Maven to properly build from source.
+  You appear to be using an older version. You can use either "mvn -version" or
+  "mvn enforcer:display-info" to verify what version is active.
+  See the reference guide on building for more information: http://hbase.apache.org/book.html#build
+                  </message>
+                </requireMavenVersion>
+                <!-- The earliest JVM version we verify builds for via ASF Jenkins -->
+                <requireJavaVersion>
+                  <version>[${java.min.version},)</version>
+                  <message>Java is out of date.
+  HBase requires at least version ${java.min.version} of the JDK to properly build from source.
+  See the reference guide on building for more information: http://hbase.apache.org/book.html#build
+                  </message>
+                </requireJavaVersion>
+                <bannedDependencies>
+                  <excludes>
+                    <exclude>xerces:xercesImpl</exclude>
+                  </excludes>
+                  <message>We avoid adding our own Xerces jars to the classpath, see HBASE-16340.</message>
+                </bannedDependencies>
+              </rules>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <version>3.0.0</version>
+        <configuration>
+          <descriptors>
+            <descriptor>src/main/assembly/src.xml</descriptor>
+          </descriptors>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+  <profiles>
+    <!-- this profile should be activated for release builds -->
+    <profile>
+      <id>release</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.rat</groupId>
+            <artifactId>apache-rat-plugin</artifactId>
+            <executions>
+              <execution>
+                <phase>package</phase>
+                <goals>
+                  <goal>check</goal>
+                </goals>
+              </execution>
+            </executions>
+          </plugin>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-enforcer-plugin</artifactId>
+            <configuration>
+              <rules>
+                <enforceBytecodeVersion>
+                  <maxJdkVersion>${compileSource}</maxJdkVersion>
+                  <message>HBase has unsupported dependencies.
+  HBase requires that all dependencies be compiled with version ${compileSource} or earlier
+  of the JDK to properly build from source.  You appear to be using a newer dependency. You can use
+  either "mvn -version" or "mvn enforcer:display-info" to verify what version is active.
+  Non-release builds can temporarily build with a newer JDK version by setting the
+  'compileSource' property (eg. mvn -DcompileSource=1.8 clean package).
+                </message>
+                </enforceBytecodeVersion>
+              </rules>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+</project>