You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ja...@apache.org on 2014/06/05 18:29:47 UTC
git commit: FLUME-2397: HBase-98 compatibility
Repository: flume
Updated Branches:
refs/heads/trunk 09472ba12 -> 0cba73698
FLUME-2397: HBase-98 compatibility
(Hari Shreedharan via Jarek Jarcec Cecho)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/0cba7369
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/0cba7369
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/0cba7369
Branch: refs/heads/trunk
Commit: 0cba73698dbba6b78d0a2cd7b469f4377723470a
Parents: 09472ba
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Thu Jun 5 09:28:02 2014 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Thu Jun 5 09:28:02 2014 -0700
----------------------------------------------------------------------
flume-ng-sinks/flume-hdfs-sink/pom.xml | 30 +++
flume-ng-sinks/flume-ng-hbase-sink/pom.xml | 114 ++++++++++--
.../apache/flume/sink/hbase/AsyncHBaseSink.java | 28 ++-
.../flume/sink/hbase/TestAsyncHBaseSink.java | 1 +
pom.xml | 182 +++++++++++++++++--
5 files changed, 322 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/0cba7369/flume-ng-sinks/flume-hdfs-sink/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-hdfs-sink/pom.xml b/flume-ng-sinks/flume-hdfs-sink/pom.xml
index e0760ae..83f8bec 100644
--- a/flume-ng-sinks/flume-hdfs-sink/pom.xml
+++ b/flume-ng-sinks/flume-hdfs-sink/pom.xml
@@ -161,6 +161,36 @@ limitations under the License.
</dependencies>
</profile>
+ <profile>
+ <id>hbase-98</id>
+ <activation>
+ <property>
+ <name>hadoop.profile</name>
+ <value>hbase-98</value>
+ </property>
+ </activation>
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-auth</artifactId>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minicluster</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+ </profile>
</profiles>
</project>
http://git-wip-us.apache.org/repos/asf/flume/blob/0cba7369/flume-ng-sinks/flume-ng-hbase-sink/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/pom.xml b/flume-ng-sinks/flume-ng-hbase-sink/pom.xml
index ddb1163..cc2bbee 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/pom.xml
+++ b/flume-ng-sinks/flume-ng-hbase-sink/pom.xml
@@ -56,18 +56,6 @@
<artifactId>guava</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase</artifactId>
- <optional>true</optional>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase</artifactId>
- <classifier>tests</classifier>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>org.hbase</groupId>
@@ -142,6 +130,24 @@
<artifactId>jersey-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase</artifactId>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase</artifactId>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</profile>
<profile>
@@ -158,6 +164,90 @@
<artifactId>hadoop-minicluster</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase</artifactId>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase</artifactId>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
+ <id>hbase-98</id>
+ <activation>
+ <property>
+ <name>hadoop.profile</name>
+ <value>hbase-98</value>
+ </property>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minicluster</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <optional>true</optional>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- There should be no need for Flume to include the following two
+ artifacts, but HBase pom has a bug which causes these to not get
+ pulled in. So we have to pull it in. Ideally this should be optional,
+ but making it optional causes build to fail.
+ -->
+
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-common</artifactId>
+ <optional>true</optional>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-testing-util</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</profile>
</profiles>
http://git-wip-us.apache.org/repos/asf/flume/blob/0cba7369/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
index 2d03271..1666be4 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
@@ -47,6 +47,8 @@ import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.hbase.async.AtomicIncrementRequest;
import org.hbase.async.HBaseClient;
import org.hbase.async.PutRequest;
+import org.jboss.netty.channel.socket.nio
+ .NioClientSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -409,13 +411,17 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
+ "before calling start on an old instance.");
sinkCounter.start();
sinkCounter.incrementConnectionCreatedCount();
- if (!isTimeoutTest) {
sinkCallbackPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
.setNameFormat(this.getName() + " HBase Call Pool").build());
+ logger.info("Callback pool created");
+ if(!isTimeoutTest) {
+ client = new HBaseClient(zkQuorum, zkBaseDir, sinkCallbackPool);
} else {
- sinkCallbackPool = Executors.newSingleThreadExecutor();
+ client = new HBaseClient(zkQuorum, zkBaseDir,
+ new NioClientSocketChannelFactory(Executors
+ .newSingleThreadExecutor(),
+ Executors.newSingleThreadExecutor()));
}
- client = new HBaseClient(zkQuorum, zkBaseDir, sinkCallbackPool);
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean fail = new AtomicBoolean(false);
client.ensureTableFamilyExists(
@@ -424,6 +430,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
@Override
public Object call(Object arg) throws Exception {
latch.countDown();
+ logger.info("table found");
return null;
}
},
@@ -437,7 +444,9 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
});
try {
+ logger.info("waiting on callback");
latch.await();
+ logger.info("callback received");
} catch (InterruptedException e) {
sinkCounter.incrementConnectionFailedCount();
throw new FlumeException(
@@ -465,15 +474,20 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
}
sinkCounter.incrementConnectionClosedCount();
sinkCounter.stop();
- sinkCallbackPool.shutdown();
+
try {
- if(!sinkCallbackPool.awaitTermination(5, TimeUnit.SECONDS)) {
- sinkCallbackPool.shutdownNow();
+ if (sinkCallbackPool != null) {
+ sinkCallbackPool.shutdown();
+ if (!sinkCallbackPool.awaitTermination(5, TimeUnit.SECONDS)) {
+ sinkCallbackPool.shutdownNow();
+ }
}
} catch (InterruptedException e) {
logger.error("Interrupted while waiting for asynchbase sink pool to " +
"die", e);
- sinkCallbackPool.shutdownNow();
+ if (sinkCallbackPool != null) {
+ sinkCallbackPool.shutdownNow();
+ }
}
sinkCallbackPool = null;
client = null;
http://git-wip-us.apache.org/repos/asf/flume/blob/0cba7369/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
index ccbc086..af90f99 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
@@ -209,6 +209,7 @@ public class TestAsyncHBaseSink {
Channel channel = new MemoryChannel();
Configurables.configure(channel, ctx);
sink.setChannel(channel);
+ channel.start();
sink.start();
Transaction tx = channel.getTransaction();
tx.begin();
http://git-wip-us.apache.org/repos/asf/flume/blob/0cba7369/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d4b7660..5d31d4c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -97,6 +97,25 @@ limitations under the License.
<artifactId>hadoop-test</artifactId>
<version>${hadoop.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase</artifactId>
+ <version>${hbase.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase</artifactId>
+ <version>${hbase.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.4.5</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</dependencyManagement>
</profile>
@@ -148,6 +167,27 @@ limitations under the License.
<version>${hadoop.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase</artifactId>
+ <version>${hbase.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase</artifactId>
+ <version>${hbase.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.4.5</version>
+ <scope>test</scope>
+ </dependency>
+
<!-- only compatible with hadoop-2 -->
<dependency>
<groupId>org.apache.flume.flume-ng-sinks</groupId>
@@ -159,6 +199,134 @@ limitations under the License.
</profile>
<profile>
+ <id>hbase-98</id>
+ <activation>
+ <property>
+ <name>hadoop.profile</name>
+ <value>hbase-98</value>
+ </property>
+ </activation>
+ <properties>
+ <hadoop.version>${hadoop2.version}</hadoop.version>
+ <hbase.version>0.98.2-hadoop2</hbase.version>
+ <hadoop.common.artifact.id>hadoop-common</hadoop.common.artifact.id>
+ <thrift.version>0.8.0</thrift.version>
+ </properties>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>${hadoop.common.artifact.id}</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minicluster</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-auth</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minicluster</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- Ideally this should be optional, but making it optional causes
+ build to fail.
+ -->
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <version>${hbase.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <version>${hbase.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-testing-util</artifactId>
+ <version>${hbase.version}</version>
+ </dependency>
+
+ <!-- There should be no need for Flume to include the following two
+ artifacts, but HBase pom has a bug which causes these to not get
+ pulled in. So we have to pull it in. Ideally this should be optional,
+ but making it optional causes build to fail.
+ -->
+
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-common</artifactId>
+ <version>${hbase.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-common</artifactId>
+ <version>${hbase.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ <version>${hbase.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ <version>${hbase.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.4.5</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- only compatible with hadoop-2 -->
+ <dependency>
+ <groupId>org.apache.flume.flume-ng-sinks</groupId>
+ <artifactId>flume-dataset-sink</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+ </profile>
+
+ <profile>
<id>compileThriftLegacy</id>
<activation>
<activeByDefault>false</activeByDefault>
@@ -859,20 +1027,6 @@ limitations under the License.
</dependency>
<dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase</artifactId>
- <version>${hbase.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase</artifactId>
- <version>${hbase.version}</version>
- <classifier>tests</classifier>
- <scope>test</scope>
- </dependency>
-
- <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop</artifactId>
<version>${hadoop.version}</version>