You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by de...@apache.org on 2017/10/19 16:26:37 UTC

flume git commit: FLUME-3186. Make asyncHbaseClient config parameters available from Flume config

Repository: flume
Updated Branches:
  refs/heads/trunk c7b95a746 -> ed288acba


FLUME-3186. Make asyncHbaseClient config parameters available from Flume config

This patch adds the ability to set the asyncHbaseClient's config parameters via
the Flume configuration.

This closes #178

Reviewers: Ferenc Szabo, Denes Arvay

(Miklos Csanady via Denes Arvay)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/ed288acb
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/ed288acb
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/ed288acb

Branch: refs/heads/trunk
Commit: ed288acba39bfd611c10b338e36224c1415c2c4c
Parents: c7b95a7
Author: Miklos Csanady <mi...@cloudera.com>
Authored: Thu Oct 19 18:21:16 2017 +0200
Committer: Denes Arvay <de...@apache.org>
Committed: Thu Oct 19 18:25:15 2017 +0200

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |   4 +
 .../apache/flume/sink/hbase/AsyncHBaseSink.java |  19 +++-
 .../hbase/HBaseSinkConfigurationConstants.java  |   6 ++
 .../hbase/TestAsyncHBaseSinkConfiguration.java  | 107 +++++++++++++++++++
 4 files changed, 135 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/ed288acb/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 6d7085c..73ed7b8 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -2540,6 +2540,10 @@ timeout              60000
                                                                                    all events in a transaction.
 serializer           org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
 serializer.*         --                                                            Properties to be passed to the serializer.
+async.*              --                                                            Properties to be passed to asyncHbase library.
+                                                                                   These properties have precedence over the old ``zookeeperQuorum`` and ``znodeParent`` values.
+                                                                                   You can find the list of the available properties at
+                                                                                   `the documentation page of AsyncHBase <http://opentsdb.github.io/asynchbase/docs/build/html/configuration.html#properties>`_.
 ===================  ============================================================  ====================================================================================
 
 Note that this sink takes the Zookeeper Quorum and parent znode information in

http://git-wip-us.apache.org/repos/asf/flume/blob/ed288acb/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
index c202a57..881f661 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.hbase.async.AtomicIncrementRequest;
+import org.hbase.async.Config;
 import org.hbase.async.HBaseClient;
 import org.hbase.async.PutRequest;
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
@@ -107,6 +108,9 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
   private long batchSize;
   private static final Logger logger = LoggerFactory.getLogger(AsyncHBaseSink.class);
   private AsyncHbaseEventSerializer serializer;
+
+  @VisibleForTesting
+  Config asyncClientConfig;
   private String eventSerializerType;
   private Context serializerContext;
   private HBaseClient client;
@@ -422,6 +426,19 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
         context.getInteger(HBaseSinkConfigurationConstants.CONFIG_MAX_CONSECUTIVE_FAILS,
                            HBaseSinkConfigurationConstants.DEFAULT_MAX_CONSECUTIVE_FAILS);
 
+
+    Map<String, String> asyncProperties
+            = context.getSubProperties(HBaseSinkConfigurationConstants.ASYNC_PREFIX);
+    asyncClientConfig = new Config();
+    asyncClientConfig.overrideConfig(
+            HBaseSinkConfigurationConstants.ASYNC_ZK_QUORUM_KEY, zkQuorum
+    );
+    asyncClientConfig.overrideConfig(
+            HBaseSinkConfigurationConstants.ASYNC_ZK_BASEPATH_KEY, zkBaseDir
+    );
+    for (String property: asyncProperties.keySet()) {
+      asyncClientConfig.overrideConfig(property, asyncProperties.get(property));
+    }
   }
 
   @VisibleForTesting
@@ -450,7 +467,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
     sinkCallbackPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
         .setNameFormat(this.getName() + " HBase Call Pool").build());
     logger.info("Callback pool created");
-    client = new HBaseClient(zkQuorum, zkBaseDir,
+    client = new HBaseClient(asyncClientConfig,
         new NioClientSocketChannelFactory(sinkCallbackPool, sinkCallbackPool));
 
     final CountDownLatch latch = new CountDownLatch(1);

http://git-wip-us.apache.org/repos/asf/flume/blob/ed288acb/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
index 5560624..f9ca4bf 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
@@ -74,4 +74,10 @@ public class HBaseSinkConfigurationConstants {
 
   public static final String CONFIG_MAX_CONSECUTIVE_FAILS = "maxConsecutiveFails";
 
+  public static final String ASYNC_PREFIX = "async.";
+
+  public static final String ASYNC_ZK_QUORUM_KEY = "hbase.zookeeper.quorum";
+
+  public static final String ASYNC_ZK_BASEPATH_KEY = "hbase.zookeeper.znode.parent";
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/ed288acb/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSinkConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSinkConfiguration.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSinkConfiguration.java
new file mode 100644
index 0000000..d4cc360
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSinkConfiguration.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.sink.hbase;
+
+import org.apache.flume.Context;
+import org.apache.flume.conf.Configurables;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestAsyncHBaseSinkConfiguration {
+
+  private static final String tableName = "TestHbaseSink";
+  private static final String columnFamily = "TestColumnFamily";
+  private static Context ctx = new Context();
+
+
+  @Before
+  public void setUp() throws Exception {
+    Map<String, String> ctxMap = new HashMap<>();
+    ctxMap.put("table", tableName);
+    ctxMap.put("columnFamily", columnFamily);
+    ctx = new Context();
+    ctx.putAll(ctxMap);
+  }
+
+
+  //FLUME-3186 Make asyncHbaseClient configuration parameters available from flume config
+  @Test
+  public void testAsyncConfigBackwardCompatibility() throws Exception {
+    //Old way: zookeeperQuorum
+    String oldZkQuorumTestValue = "old_zookeeper_quorum_test_value";
+    String oldZkZnodeParentValue = "old_zookeeper_znode_parent_test_value";
+    ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM, oldZkQuorumTestValue);
+    ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,oldZkZnodeParentValue);
+    AsyncHBaseSink sink = new AsyncHBaseSink();
+    Configurables.configure(sink, ctx);
+    Assert.assertEquals(
+            oldZkQuorumTestValue,
+            sink.asyncClientConfig.getString(HBaseSinkConfigurationConstants.ASYNC_ZK_QUORUM_KEY));
+    Assert.assertEquals(
+            oldZkZnodeParentValue,
+            sink.asyncClientConfig.getString(
+                    HBaseSinkConfigurationConstants.ASYNC_ZK_BASEPATH_KEY));
+  }
+
+  @Test
+  public void testAsyncConfigNewStyleOverwriteOldOne() throws Exception {
+    //Old way: zookeeperQuorum
+    String oldZkQuorumTestValue = "old_zookeeper_quorum_test_value";
+    String oldZkZnodeParentValue = "old_zookeeper_znode_parent_test_value";
+    ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM, oldZkQuorumTestValue);
+    ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,oldZkZnodeParentValue);
+
+    String newZkQuorumTestValue = "new_zookeeper_quorum_test_value";
+    String newZkZnodeParentValue = "new_zookeeper_znode_parent_test_value";
+    ctx.put(
+            HBaseSinkConfigurationConstants.ASYNC_PREFIX +
+                    HBaseSinkConfigurationConstants.ASYNC_ZK_QUORUM_KEY,
+            newZkQuorumTestValue);
+    ctx.put(
+            HBaseSinkConfigurationConstants.ASYNC_PREFIX +
+                    HBaseSinkConfigurationConstants.ASYNC_ZK_BASEPATH_KEY,
+            newZkZnodeParentValue);
+    AsyncHBaseSink sink = new AsyncHBaseSink();
+    Configurables.configure(sink, ctx);
+    Assert.assertEquals(
+            newZkQuorumTestValue,
+            sink.asyncClientConfig.getString(HBaseSinkConfigurationConstants.ASYNC_ZK_QUORUM_KEY));
+    Assert.assertEquals(
+            newZkZnodeParentValue,
+            sink.asyncClientConfig.getString(
+                    HBaseSinkConfigurationConstants.ASYNC_ZK_BASEPATH_KEY));
+  }
+
+  @Test
+  public void testAsyncConfigAnyKeyCanBePassed() throws Exception {
+    String valueOfANewProp = "vale of the new property";
+    String keyOfANewProp = "some.key.to.be.passed";
+    ctx.put(HBaseSinkConfigurationConstants.ASYNC_PREFIX + keyOfANewProp, valueOfANewProp);
+    AsyncHBaseSink sink = new AsyncHBaseSink();
+    Configurables.configure(sink, ctx);
+    Assert.assertEquals(valueOfANewProp, sink.asyncClientConfig.getString(keyOfANewProp));
+  }
+}
+
+