You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/11/24 19:21:22 UTC

[2/6] camel git commit: handling UGI within camel-hbase

handling UGI within camel-hbase


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e0b3255c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e0b3255c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e0b3255c

Branch: refs/heads/master
Commit: e0b3255c233873dec0c14b593d7d82bb1efa4826
Parents: db823ea
Author: woj-i <wo...@gmail.com>
Authored: Thu Oct 15 18:29:29 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Tue Nov 24 19:07:55 2015 +0100

----------------------------------------------------------------------
 .../camel/component/hbase/HBaseConsumer.java    | 12 +++----
 .../camel/component/hbase/HBaseEndpoint.java    | 37 ++++++++++++++++++--
 .../camel/component/hbase/HBaseProducer.java    | 11 ++----
 3 files changed, 41 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e0b3255c/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java
index bf3760d..59aaa43 100644
--- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java
+++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseConsumer.java
@@ -21,6 +21,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
 import java.util.Set;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.component.hbase.mapping.CellMappingStrategy;
@@ -33,7 +34,6 @@ import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.HTablePool;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
@@ -50,22 +50,18 @@ public class HBaseConsumer extends ScheduledBatchPollingConsumer {
 
     private static final Logger LOG = LoggerFactory.getLogger(HBaseConsumer.class);
 
-    private String tableName;
     private final HBaseEndpoint endpoint;
-    private HTablePool tablePool;
     private HBaseRow rowModel;
 
-    public HBaseConsumer(HBaseEndpoint endpoint, Processor processor, HTablePool tablePool, String tableName) {
+    public HBaseConsumer(HBaseEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
         this.endpoint = endpoint;
-        this.tableName = tableName;
-        this.tablePool = tablePool;
         this.rowModel = endpoint.getRowModel();
     }
 
     @Override
     protected int poll() throws Exception {
-        HTableInterface table = tablePool.getTable(tableName);
+        HTableInterface table = endpoint.getTable();
         try {
             shutdownRunningTask = null;
             pendingExchanges = 0;
@@ -192,7 +188,7 @@ public class HBaseConsumer extends ScheduledBatchPollingConsumer {
      * Delegates to the {@link HBaseRemoveHandler}.
      */
     private void remove(byte[] row) throws IOException {
-        HTableInterface table = tablePool.getTable(tableName);
+        HTableInterface table = endpoint.getTable();
         try {
             endpoint.getRemoveHandler().remove(table, row);
         } finally {

http://git-wip-us.apache.org/repos/asf/camel/blob/e0b3255c/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java
index cb08a02..109418c 100644
--- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java
+++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseEndpoint.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.hbase;
 
+import java.security.PrivilegedAction;
 import java.util.List;
 
 import org.apache.camel.Consumer;
@@ -30,8 +31,10 @@ import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.HTablePool;
 import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.security.UserGroupInformation;
 
 /**
  * Represents an HBase endpoint.
@@ -66,21 +69,30 @@ public class HBaseEndpoint extends DefaultEndpoint {
     @UriParam(label = "consumer")
     private int maxMessagesPerPoll;
 
+	/**
+	 * in the purpose of performance optimization
+	 */
+	private byte[] tableNameBytes;
+
+	private UserGroupInformation ugi = null;
+
     public HBaseEndpoint(String uri, HBaseComponent component, HTablePool tablePool, String tableName) {
         super(uri, component);
         this.tableName = tableName;
         this.tablePool = tablePool;
         if (this.tableName == null) {
             throw new IllegalArgumentException("Table name can not be null");
-        }
+        }else{
+			tableNameBytes = tableName.getBytes();
+		}
     }
 
     public Producer createProducer() throws Exception {
-        return new HBaseProducer(this, tablePool, tableName);
+        return new HBaseProducer(this);
     }
 
     public Consumer createConsumer(Processor processor) throws Exception {
-        HBaseConsumer consumer =  new HBaseConsumer(this, processor, tablePool, tableName);
+        HBaseConsumer consumer =  new HBaseConsumer(this, processor);
         configureConsumer(consumer);
         consumer.setMaxMessagesPerPoll(maxMessagesPerPoll);
         return consumer;
@@ -217,4 +229,23 @@ public class HBaseEndpoint extends DefaultEndpoint {
     public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
         this.maxMessagesPerPoll = maxMessagesPerPoll;
     }
+
+
+	/**
+	 * Gets connection to the table (secured or not, depends on the object initialization)
+	 * please remember to close the table after use
+	 * @return table, remember to close!
+	 */
+	public HTableInterface getTable(){
+		if (ugi!=null){
+			return ugi.doAs(new PrivilegedAction<HTableInterface>() {
+				@Override
+				public HTableInterface run() {
+					return tablePool.getTable(tableNameBytes);
+				}
+			});
+		}else{
+			return tablePool.getTable(tableNameBytes);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/e0b3255c/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java
index 4a921aa..215e502 100644
--- a/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java
+++ b/components/camel-hbase/src/main/java/org/apache/camel/component/hbase/HBaseProducer.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.HTablePool;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -49,20 +48,16 @@ import org.apache.hadoop.hbase.util.Bytes;
 public class HBaseProducer extends DefaultProducer implements ServicePoolAware {
 
     private HBaseEndpoint endpoint;
-    private String tableName;
-    private final HTablePool tablePool;
     private HBaseRow rowModel;
 
-    public HBaseProducer(HBaseEndpoint endpoint, HTablePool tablePool, String tableName) {
+    public HBaseProducer(HBaseEndpoint endpoint) {
         super(endpoint);
         this.endpoint = endpoint;
-        this.tableName = tableName;
-        this.tablePool = tablePool;
         this.rowModel = endpoint.getRowModel();
     }
 
     public void process(Exchange exchange) throws Exception {
-        HTableInterface table = tablePool.getTable(tableName.getBytes());
+		HTableInterface table = endpoint.getTable();
         try {
 
             updateHeaders(exchange);
@@ -105,7 +100,7 @@ public class HBaseProducer extends DefaultProducer implements ServicePoolAware {
                 mappingStrategy.applyScanResults(exchange.getOut(), new HBaseData(scanOperationResult));
             }
         } finally {
-            table.close();
+			table.close();
         }
     }