You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by de...@apache.org on 2017/10/19 16:26:37 UTC
flume git commit: FLUME-3186. Make asyncHbaseClient config parameters
available from Flume config
Repository: flume
Updated Branches:
refs/heads/trunk c7b95a746 -> ed288acba
FLUME-3186. Make asyncHbaseClient config parameters available from Flume config
This patch adds the ability to set the asyncHbaseClient's config parameters via
the Flume configuration.
This closes #178
Reviewers: Ferenc Szabo, Denes Arvay
(Miklos Csanady via Denes Arvay)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/ed288acb
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/ed288acb
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/ed288acb
Branch: refs/heads/trunk
Commit: ed288acba39bfd611c10b338e36224c1415c2c4c
Parents: c7b95a7
Author: Miklos Csanady <mi...@cloudera.com>
Authored: Thu Oct 19 18:21:16 2017 +0200
Committer: Denes Arvay <de...@apache.org>
Committed: Thu Oct 19 18:25:15 2017 +0200
----------------------------------------------------------------------
flume-ng-doc/sphinx/FlumeUserGuide.rst | 4 +
.../apache/flume/sink/hbase/AsyncHBaseSink.java | 19 +++-
.../hbase/HBaseSinkConfigurationConstants.java | 6 ++
.../hbase/TestAsyncHBaseSinkConfiguration.java | 107 +++++++++++++++++++
4 files changed, 135 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/ed288acb/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 6d7085c..73ed7b8 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -2540,6 +2540,10 @@ timeout 60000
all events in a transaction.
serializer org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
serializer.* -- Properties to be passed to the serializer.
+async.* -- Properties to be passed to asyncHbase library.
+ These properties have precedence over the old ``zookeeperQuorum`` and ``znodeParent`` values.
+ You can find the list of the available properties at
+ `the documentation page of AsyncHBase <http://opentsdb.github.io/asynchbase/docs/build/html/configuration.html#properties>`_.
=================== ============================================================ ====================================================================================
Note that this sink takes the Zookeeper Quorum and parent znode information in
http://git-wip-us.apache.org/repos/asf/flume/blob/ed288acb/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
index c202a57..881f661 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.hbase.async.AtomicIncrementRequest;
+import org.hbase.async.Config;
import org.hbase.async.HBaseClient;
import org.hbase.async.PutRequest;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
@@ -107,6 +108,9 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
private long batchSize;
private static final Logger logger = LoggerFactory.getLogger(AsyncHBaseSink.class);
private AsyncHbaseEventSerializer serializer;
+
+ @VisibleForTesting
+ Config asyncClientConfig;
private String eventSerializerType;
private Context serializerContext;
private HBaseClient client;
@@ -422,6 +426,19 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
context.getInteger(HBaseSinkConfigurationConstants.CONFIG_MAX_CONSECUTIVE_FAILS,
HBaseSinkConfigurationConstants.DEFAULT_MAX_CONSECUTIVE_FAILS);
+
+ Map<String, String> asyncProperties
+ = context.getSubProperties(HBaseSinkConfigurationConstants.ASYNC_PREFIX);
+ asyncClientConfig = new Config();
+ asyncClientConfig.overrideConfig(
+ HBaseSinkConfigurationConstants.ASYNC_ZK_QUORUM_KEY, zkQuorum
+ );
+ asyncClientConfig.overrideConfig(
+ HBaseSinkConfigurationConstants.ASYNC_ZK_BASEPATH_KEY, zkBaseDir
+ );
+ for (String property: asyncProperties.keySet()) {
+ asyncClientConfig.overrideConfig(property, asyncProperties.get(property));
+ }
}
@VisibleForTesting
@@ -450,7 +467,7 @@ public class AsyncHBaseSink extends AbstractSink implements Configurable {
sinkCallbackPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
.setNameFormat(this.getName() + " HBase Call Pool").build());
logger.info("Callback pool created");
- client = new HBaseClient(zkQuorum, zkBaseDir,
+ client = new HBaseClient(asyncClientConfig,
new NioClientSocketChannelFactory(sinkCallbackPool, sinkCallbackPool));
final CountDownLatch latch = new CountDownLatch(1);
http://git-wip-us.apache.org/repos/asf/flume/blob/ed288acb/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
index 5560624..f9ca4bf 100644
--- a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/HBaseSinkConfigurationConstants.java
@@ -74,4 +74,10 @@ public class HBaseSinkConfigurationConstants {
public static final String CONFIG_MAX_CONSECUTIVE_FAILS = "maxConsecutiveFails";
+ public static final String ASYNC_PREFIX = "async.";
+
+ public static final String ASYNC_ZK_QUORUM_KEY = "hbase.zookeeper.quorum";
+
+ public static final String ASYNC_ZK_BASEPATH_KEY = "hbase.zookeeper.znode.parent";
+
}
http://git-wip-us.apache.org/repos/asf/flume/blob/ed288acb/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSinkConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSinkConfiguration.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSinkConfiguration.java
new file mode 100644
index 0000000..d4cc360
--- /dev/null
+++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSinkConfiguration.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flume.sink.hbase;
+
+import org.apache.flume.Context;
+import org.apache.flume.conf.Configurables;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestAsyncHBaseSinkConfiguration {
+
+ private static final String tableName = "TestHbaseSink";
+ private static final String columnFamily = "TestColumnFamily";
+ private static Context ctx = new Context();
+
+
+ @Before
+ public void setUp() throws Exception {
+ Map<String, String> ctxMap = new HashMap<>();
+ ctxMap.put("table", tableName);
+ ctxMap.put("columnFamily", columnFamily);
+ ctx = new Context();
+ ctx.putAll(ctxMap);
+ }
+
+
+ //FLUME-3186 Make asyncHbaseClient configuration parameters available from flume config
+ @Test
+ public void testAsyncConfigBackwardCompatibility() throws Exception {
+ //Old way: zookeeperQuorum
+ String oldZkQuorumTestValue = "old_zookeeper_quorum_test_value";
+ String oldZkZnodeParentValue = "old_zookeeper_znode_parent_test_value";
+ ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM, oldZkQuorumTestValue);
+ ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,oldZkZnodeParentValue);
+ AsyncHBaseSink sink = new AsyncHBaseSink();
+ Configurables.configure(sink, ctx);
+ Assert.assertEquals(
+ oldZkQuorumTestValue,
+ sink.asyncClientConfig.getString(HBaseSinkConfigurationConstants.ASYNC_ZK_QUORUM_KEY));
+ Assert.assertEquals(
+ oldZkZnodeParentValue,
+ sink.asyncClientConfig.getString(
+ HBaseSinkConfigurationConstants.ASYNC_ZK_BASEPATH_KEY));
+ }
+
+ @Test
+ public void testAsyncConfigNewStyleOverwriteOldOne() throws Exception {
+ //Old way: zookeeperQuorum
+ String oldZkQuorumTestValue = "old_zookeeper_quorum_test_value";
+ String oldZkZnodeParentValue = "old_zookeeper_znode_parent_test_value";
+ ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM, oldZkQuorumTestValue);
+ ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,oldZkZnodeParentValue);
+
+ String newZkQuorumTestValue = "new_zookeeper_quorum_test_value";
+ String newZkZnodeParentValue = "new_zookeeper_znode_parent_test_value";
+ ctx.put(
+ HBaseSinkConfigurationConstants.ASYNC_PREFIX +
+ HBaseSinkConfigurationConstants.ASYNC_ZK_QUORUM_KEY,
+ newZkQuorumTestValue);
+ ctx.put(
+ HBaseSinkConfigurationConstants.ASYNC_PREFIX +
+ HBaseSinkConfigurationConstants.ASYNC_ZK_BASEPATH_KEY,
+ newZkZnodeParentValue);
+ AsyncHBaseSink sink = new AsyncHBaseSink();
+ Configurables.configure(sink, ctx);
+ Assert.assertEquals(
+ newZkQuorumTestValue,
+ sink.asyncClientConfig.getString(HBaseSinkConfigurationConstants.ASYNC_ZK_QUORUM_KEY));
+ Assert.assertEquals(
+ newZkZnodeParentValue,
+ sink.asyncClientConfig.getString(
+ HBaseSinkConfigurationConstants.ASYNC_ZK_BASEPATH_KEY));
+ }
+
+ @Test
+ public void testAsyncConfigAnyKeyCanBePassed() throws Exception {
+ String valueOfANewProp = "vale of the new property";
+ String keyOfANewProp = "some.key.to.be.passed";
+ ctx.put(HBaseSinkConfigurationConstants.ASYNC_PREFIX + keyOfANewProp, valueOfANewProp);
+ AsyncHBaseSink sink = new AsyncHBaseSink();
+ Configurables.configure(sink, ctx);
+ Assert.assertEquals(valueOfANewProp, sink.asyncClientConfig.getString(keyOfANewProp));
+ }
+}
+
+