You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ja...@apache.org on 2014/06/05 18:29:47 UTC

git commit: FLUME-2397: HBase-98 compatibility

Repository: flume
Updated Branches:
  refs/heads/trunk 09472ba12 -> 0cba73698


FLUME-2397: HBase-98 compatibility

(Hari Shreedharan via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/0cba7369
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/0cba7369
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/0cba7369

Branch: refs/heads/trunk
Commit: 0cba73698dbba6b78d0a2cd7b469f4377723470a
Parents: 09472ba
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Thu Jun 5 09:28:02 2014 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Thu Jun 5 09:28:02 2014 -0700

----------------------------------------------------------------------
 flume-ng-sinks/flume-hdfs-sink/pom.xml          |  30 +++
 flume-ng-sinks/flume-ng-hbase-sink/pom.xml      | 114 ++++++++++--
 .../apache/flume/sink/hbase/AsyncHBaseSink.java |  28 ++-
 .../flume/sink/hbase/TestAsyncHBaseSink.java    |   1 +
 pom.xml                                         | 182 +++++++++++++++++--
 5 files changed, 322 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/0cba7369/flume-ng-sinks/flume-hdfs-sink/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/pom.xml b/flume-ng-sinks/flume-hdfs-sink/pom.xml
index e0760ae..83f8bec 100644
--- a/flume-ng-sinks/flume-hdfs-sink/pom.xml
+++ b/flume-ng-sinks/flume-hdfs-sink/pom.xml
@@ -161,6 +161,36 @@ limitations under the License.
       </dependencies>
     </profile>
 
+    <profile>
+      <id>hbase-98</id>
+      <activation>
+        <property>
+          <name>hadoop.profile</name>
+          <value>hbase-98</value>
+        </property>
+      </activation>
+      <dependencies>
+
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-hdfs</artifactId>
+          <optional>true</optional>
+        </dependency>
+
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-auth</artifactId>
+          <optional>true</optional>
+        </dependency>
+
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-minicluster</artifactId>
+          <scope>test</scope>
+        </dependency>
+
+      </dependencies>
+    </profile>
   </profiles>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flume/blob/0cba7369/flume-ng-sinks/flume-ng-hbase-sink/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/pom.xml b/flume-ng-sinks/flume-ng-hbase-sink/pom.xml
index ddb1163..cc2bbee 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/pom.xml
+++ b/flume-ng-sinks/flume-ng-hbase-sink/pom.xml
@@ -56,18 +56,6 @@
       <artifactId>guava</artifactId>
     </dependency>
 
-    <dependency>
-      <groupId>org.apache.hbase</groupId>
-      <artifactId>hbase</artifactId>
-      <optional>true</optional>
-    </dependency>
-
-    <dependency>
-      <groupId>org.apache.hbase</groupId>
-      <artifactId>hbase</artifactId>
-      <classifier>tests</classifier>
-      <scope>test</scope>
-    </dependency>
 
     <dependency>
       <groupId>org.hbase</groupId>
@@ -142,6 +130,24 @@
           <artifactId>jersey-core</artifactId>
           <scope>test</scope>
         </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase</artifactId>
+          <optional>true</optional>
+        </dependency>
+
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase</artifactId>
+          <classifier>tests</classifier>
+          <scope>test</scope>
+        </dependency>
+
+        <dependency>
+          <groupId>org.apache.zookeeper</groupId>
+          <artifactId>zookeeper</artifactId>
+          <scope>test</scope>
+        </dependency>
       </dependencies>
     </profile>
     <profile>
@@ -158,6 +164,90 @@
           <artifactId>hadoop-minicluster</artifactId>
           <scope>test</scope>
         </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase</artifactId>
+          <optional>true</optional>
+        </dependency>
+
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase</artifactId>
+          <classifier>tests</classifier>
+          <scope>test</scope>
+        </dependency>
+
+        <dependency>
+          <groupId>org.apache.zookeeper</groupId>
+          <artifactId>zookeeper</artifactId>
+          <scope>test</scope>
+        </dependency>
+      </dependencies>
+    </profile>
+    <profile>
+      <id>hbase-98</id>
+      <activation>
+        <property>
+          <name>hadoop.profile</name>
+          <value>hbase-98</value>
+        </property>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-minicluster</artifactId>
+          <scope>test</scope>
+        </dependency>
+
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-client</artifactId>
+          <optional>true</optional>
+        </dependency>
+
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-client</artifactId>
+          <classifier>tests</classifier>
+          <scope>test</scope>
+        </dependency>
+
+
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-server</artifactId>
+          <scope>test</scope>
+        </dependency>
+
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-server</artifactId>
+          <classifier>tests</classifier>
+          <scope>test</scope>
+        </dependency>
+
+        <!-- There should be no need for Flume to include the following two
+         artifacts, but HBase pom has a bug which causes these to not get
+         pulled in. So we have to pull it in. Ideally this should be optional,
+         but making it optional causes build to fail.
+        -->
+
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-common</artifactId>
+          <optional>true</optional>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hbase</groupId>
+          <artifactId>hbase-testing-util</artifactId>
+          <scope>test</scope>
+        </dependency>
+
+        <dependency>
+          <groupId>org.apache.zookeeper</groupId>
+          <artifactId>zookeeper</artifactId>
+          <scope>test</scope>
+        </dependency>
       </dependencies>
     </profile>
   </profiles>

http://git-wip-us.apache.org/repos/asf/flume/blob/0cba7369/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
index 2d03271..1666be4 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
@@ -47,6 +47,8 @@ import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.hbase.async.AtomicIncrementRequest;
 import org.hbase.async.HBaseClient;
 import org.hbase.async.PutRequest;
+import org.jboss.netty.channel.socket.nio
+  .NioClientSocketChannelFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -409,13 +411,17 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
             + "before calling start on an old instance.");
     sinkCounter.start();
     sinkCounter.incrementConnectionCreatedCount();
-    if (!isTimeoutTest) {
       sinkCallbackPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
         .setNameFormat(this.getName() + " HBase Call Pool").build());
+    logger.info("Callback pool created");
+    if(!isTimeoutTest) {
+      client = new HBaseClient(zkQuorum, zkBaseDir, sinkCallbackPool);
     } else {
-      sinkCallbackPool = Executors.newSingleThreadExecutor();
+      client = new HBaseClient(zkQuorum, zkBaseDir,
+        new NioClientSocketChannelFactory(Executors
+          .newSingleThreadExecutor(),
+          Executors.newSingleThreadExecutor()));
     }
-    client = new HBaseClient(zkQuorum, zkBaseDir, sinkCallbackPool);
     final CountDownLatch latch = new CountDownLatch(1);
     final AtomicBoolean fail = new AtomicBoolean(false);
     client.ensureTableFamilyExists(
@@ -424,6 +430,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
               @Override
               public Object call(Object arg) throws Exception {
                 latch.countDown();
+                logger.info("table found");
                 return null;
               }
             },
@@ -437,7 +444,9 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
             });
 
     try {
+      logger.info("waiting on callback");
       latch.await();
+      logger.info("callback received");
     } catch (InterruptedException e) {
       sinkCounter.incrementConnectionFailedCount();
       throw new FlumeException(
@@ -465,15 +474,20 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
     }
     sinkCounter.incrementConnectionClosedCount();
     sinkCounter.stop();
-    sinkCallbackPool.shutdown();
+
     try {
-      if(!sinkCallbackPool.awaitTermination(5, TimeUnit.SECONDS)) {
-        sinkCallbackPool.shutdownNow();
+      if (sinkCallbackPool != null) {
+        sinkCallbackPool.shutdown();
+        if (!sinkCallbackPool.awaitTermination(5, TimeUnit.SECONDS)) {
+          sinkCallbackPool.shutdownNow();
+        }
       }
     } catch (InterruptedException e) {
       logger.error("Interrupted while waiting for asynchbase sink pool to " +
         "die", e);
-      sinkCallbackPool.shutdownNow();
+      if (sinkCallbackPool != null) {
+        sinkCallbackPool.shutdownNow();
+      }
     }
     sinkCallbackPool = null;
     client = null;

http://git-wip-us.apache.org/repos/asf/flume/blob/0cba7369/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
index ccbc086..af90f99 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
@@ -209,6 +209,7 @@ public class TestAsyncHBaseSink {
     Channel channel = new MemoryChannel();
     Configurables.configure(channel, ctx);
     sink.setChannel(channel);
+    channel.start();
     sink.start();
     Transaction tx = channel.getTransaction();
     tx.begin();

http://git-wip-us.apache.org/repos/asf/flume/blob/0cba7369/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d4b7660..5d31d4c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -97,6 +97,25 @@ limitations under the License.
             <artifactId>hadoop-test</artifactId>
             <version>${hadoop.version}</version>
           </dependency>
+          <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase</artifactId>
+            <version>${hbase.version}</version>
+          </dependency>
+
+          <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase</artifactId>
+            <version>${hbase.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.zookeeper</groupId>
+            <artifactId>zookeeper</artifactId>
+            <version>3.4.5</version>
+            <scope>test</scope>
+          </dependency>
         </dependencies>
       </dependencyManagement>
     </profile>
@@ -148,6 +167,27 @@ limitations under the License.
             <version>${hadoop.version}</version>
           </dependency>
 
+          <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase</artifactId>
+            <version>${hbase.version}</version>
+          </dependency>
+
+          <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase</artifactId>
+            <version>${hbase.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+          </dependency>
+
+          <dependency>
+            <groupId>org.apache.zookeeper</groupId>
+            <artifactId>zookeeper</artifactId>
+            <version>3.4.5</version>
+            <scope>test</scope>
+          </dependency>
+
           <!-- only compatible with hadoop-2 -->
           <dependency>
             <groupId>org.apache.flume.flume-ng-sinks</groupId>
@@ -159,6 +199,134 @@ limitations under the License.
     </profile>
 
     <profile>
+      <id>hbase-98</id>
+      <activation>
+        <property>
+          <name>hadoop.profile</name>
+          <value>hbase-98</value>
+        </property>
+      </activation>
+      <properties>
+        <hadoop.version>${hadoop2.version}</hadoop.version>
+        <hbase.version>0.98.2-hadoop2</hbase.version>
+        <hadoop.common.artifact.id>hadoop-common</hadoop.common.artifact.id>
+        <thrift.version>0.8.0</thrift.version>
+      </properties>
+      <dependencyManagement>
+        <dependencies>
+          <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>${hadoop.common.artifact.id}</artifactId>
+            <version>${hadoop.version}</version>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <version>${hadoop.version}</version>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-minicluster</artifactId>
+            <version>${hadoop.version}</version>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+            <version>${hadoop.version}</version>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-annotations</artifactId>
+            <version>${hadoop.version}</version>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-auth</artifactId>
+            <version>${hadoop.version}</version>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-minicluster</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>test</scope>
+          </dependency>
+
+          <!-- Ideally this should be optional, but making it optional causes
+          build to fail.
+          -->
+          <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-client</artifactId>
+            <version>${hbase.version}</version>
+          </dependency>
+
+          <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-client</artifactId>
+            <version>${hbase.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+          </dependency>
+
+          <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-testing-util</artifactId>
+            <version>${hbase.version}</version>
+          </dependency>
+
+          <!-- There should be no need for Flume to include the following two
+           artifacts, but HBase pom has a bug which causes these to not get
+           pulled in. So we have to pull it in. Ideally this should be optional,
+           but making it optional causes build to fail.
+          -->
+
+          <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-common</artifactId>
+            <version>${hbase.version}</version>
+          </dependency>
+
+          <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-common</artifactId>
+            <version>${hbase.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+          </dependency>
+
+          <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-server</artifactId>
+            <version>${hbase.version}</version>
+            <scope>test</scope>
+          </dependency>
+
+          <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-server</artifactId>
+            <version>${hbase.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+          </dependency>
+
+          <dependency>
+            <groupId>org.apache.zookeeper</groupId>
+            <artifactId>zookeeper</artifactId>
+            <version>3.4.5</version>
+            <scope>test</scope>
+          </dependency>
+
+          <!-- only compatible with hadoop-2 -->
+          <dependency>
+            <groupId>org.apache.flume.flume-ng-sinks</groupId>
+            <artifactId>flume-dataset-sink</artifactId>
+            <version>${project.version}</version>
+          </dependency>
+        </dependencies>
+      </dependencyManagement>
+    </profile>
+
+    <profile>
       <id>compileThriftLegacy</id>
       <activation>
         <activeByDefault>false</activeByDefault>
@@ -859,20 +1027,6 @@ limitations under the License.
       </dependency>
 
       <dependency>
-        <groupId>org.apache.hbase</groupId>
-        <artifactId>hbase</artifactId>
-        <version>${hbase.version}</version>
-      </dependency>
-
-      <dependency>
-        <groupId>org.apache.hbase</groupId>
-        <artifactId>hbase</artifactId>
-        <version>${hbase.version}</version>
-        <classifier>tests</classifier>
-        <scope>test</scope>
-      </dependency>
-
-      <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop</artifactId>
         <version>${hadoop.version}</version>