You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/11/24 19:21:22 UTC
[2/6] camel git commit: handling UGI within camel-hbase
handling UGI within camel-hbase
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e0b3255c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e0b3255c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e0b3255c
Branch: refs/heads/master
Commit: e0b3255c233873dec0c14b593d7d82bb1efa4826
Parents: db823ea
Author: woj-i <wo...@gmail.com>
Authored: Thu Oct 15 18:29:29 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Nov 24 19:07:55 2015 +0100
----------------------------------------------------------------------
.../camel/component/hbase/HBaseConsumer.java | 12 +++----
.../camel/component/hbase/HBaseEndpoint.java | 37 ++++++++++++++++++--
.../camel/component/hbase/HBaseProducer.java | 11 ++----
3 files changed, 41 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/e0b3255c/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java
index bf3760d..59aaa43 100644
--- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java
+++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java
@@ -21,6 +21,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Set;
+
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.hbase.mapping.CellMappingStrategy;
@@ -33,7 +34,6 @@ import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
@@ -50,22 +50,18 @@ public class HBaseConsumer extends ScheduledBatchPollingConsumer {
private static final Logger LOG = LoggerFactory.getLogger(HBaseConsumer.class);
- private String tableName;
private final HBaseEndpoint endpoint;
- private HTablePool tablePool;
private HBaseRow rowModel;
- public HBaseConsumer(HBaseEndpoint endpoint, Processor processor, HTablePool tablePool, String tableName) {
+ public HBaseConsumer(HBaseEndpoint endpoint, Processor processor) {
super(endpoint, processor);
this.endpoint = endpoint;
- this.tableName = tableName;
- this.tablePool = tablePool;
this.rowModel = endpoint.getRowModel();
}
@Override
protected int poll() throws Exception {
- HTableInterface table = tablePool.getTable(tableName);
+ HTableInterface table = endpoint.getTable();
try {
shutdownRunningTask = null;
pendingExchanges = 0;
@@ -192,7 +188,7 @@ public class HBaseConsumer extends ScheduledBatchPollingConsumer {
* Delegates to the {@link HBaseRemoveHandler}.
*/
private void remove(byte[] row) throws IOException {
- HTableInterface table = tablePool.getTable(tableName);
+ HTableInterface table = endpoint.getTable();
try {
endpoint.getRemoveHandler().remove(table, row);
} finally {
http://git-wip-us.apache.org/repos/asf/camel/blob/e0b3255c/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java
index cb08a02..109418c 100644
--- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java
+++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.hbase;
+import java.security.PrivilegedAction;
import java.util.List;
import org.apache.camel.Consumer;
@@ -30,8 +31,10 @@ import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.security.UserGroupInformation;
/**
* Represents an HBase endpoint.
@@ -66,21 +69,30 @@ public class HBaseEndpoint extends DefaultEndpoint {
@UriParam(label = "consumer")
private int maxMessagesPerPoll;
+ /**
+ * in the purpose of performance optimization
+ */
+ private byte[] tableNameBytes;
+
+ private UserGroupInformation ugi = null;
+
public HBaseEndpoint(String uri, HBaseComponent component, HTablePool tablePool, String tableName) {
super(uri, component);
this.tableName = tableName;
this.tablePool = tablePool;
if (this.tableName == null) {
throw new IllegalArgumentException("Table name can not be null");
- }
+ }else{
+ tableNameBytes = tableName.getBytes();
+ }
}
public Producer createProducer() throws Exception {
- return new HBaseProducer(this, tablePool, tableName);
+ return new HBaseProducer(this);
}
public Consumer createConsumer(Processor processor) throws Exception {
- HBaseConsumer consumer = new HBaseConsumer(this, processor, tablePool, tableName);
+ HBaseConsumer consumer = new HBaseConsumer(this, processor);
configureConsumer(consumer);
consumer.setMaxMessagesPerPoll(maxMessagesPerPoll);
return consumer;
@@ -217,4 +229,23 @@ public class HBaseEndpoint extends DefaultEndpoint {
public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
this.maxMessagesPerPoll = maxMessagesPerPoll;
}
+
+
+ /**
+ * Gets connection to the table (secured or not, depends on the object initialization)
+ * please remember to close the table after use
+ * @return table, remember to close!
+ */
+ public HTableInterface getTable(){
+ if (ugi!=null){
+ return ugi.doAs(new PrivilegedAction<HTableInterface>() {
+ @Override
+ public HTableInterface run() {
+ return tablePool.getTable(tableNameBytes);
+ }
+ });
+ }else{
+ return tablePool.getTable(tableNameBytes);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/e0b3255c/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java
index 4a921aa..215e502 100644
--- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java
+++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -49,20 +48,16 @@ import org.apache.hadoop.hbase.util.Bytes;
public class HBaseProducer extends DefaultProducer implements ServicePoolAware {
private HBaseEndpoint endpoint;
- private String tableName;
- private final HTablePool tablePool;
private HBaseRow rowModel;
- public HBaseProducer(HBaseEndpoint endpoint, HTablePool tablePool, String tableName) {
+ public HBaseProducer(HBaseEndpoint endpoint) {
super(endpoint);
this.endpoint = endpoint;
- this.tableName = tableName;
- this.tablePool = tablePool;
this.rowModel = endpoint.getRowModel();
}
public void process(Exchange exchange) throws Exception {
- HTableInterface table = tablePool.getTable(tableName.getBytes());
+ HTableInterface table = endpoint.getTable();
try {
updateHeaders(exchange);
@@ -105,7 +100,7 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware {
mappingStrategy.applyScanResults(exchange.getOut(), new HBaseData(scanOperationResult));
}
} finally {
- table.close();
+ table.close();
}
}