You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jg...@apache.org on 2011/10/20 02:42:57 UTC

svn commit: r1186586 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/regionserver/ src/main/java/org/apache/hadoop/hbase/thrift/

Author: jgray
Date: Thu Oct 20 00:42:57 2011
New Revision: 1186586

URL: http://svn.apache.org/viewvc?rev=1186586&view=rev
Log:
HBASE-4460  Support running an embedded ThriftServer within a RegionServer (jgray)

Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java
Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1186586&r1=1186585&r2=1186586&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Thu Oct 20 00:42:57 2011
@@ -1,5 +1,8 @@
 HBase Change Log
 Release 0.93.0 - Unreleased
+  NEW FEATURE
+   HBASE-4460  Support running an embedded ThriftServer within a RegionServer (jgray)
+
   IMPROVEMENT
    HBASE-4132  Extend the WALActionsListener API to accomodate log archival
                (dhruba borthakur)

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1186586&r1=1186585&r2=1186586&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Thu Oct 20 00:42:57 2011
@@ -300,6 +300,9 @@ public class HRegionServer implements HR
   // Cache configuration and block cache reference
   private final CacheConfig cacheConfig;
 
+  // reference to the Thrift Server.
+  volatile private HRegionThriftServer thriftServer;
+
   /**
    * The server name the Master sees us as.  Its made from the hostname the
    * master passes us, port, and server startcode. Gets set after registration
@@ -617,6 +620,13 @@ public class HRegionServer implements HR
         HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
         HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD),
         this.threadWakeFrequency);
+
+    // Create the thread for the ThriftServer.
+    if (conf.getBoolean("hbase.regionserver.export.thrift", false)) {
+      thriftServer = new HRegionThriftServer(this, conf);
+      thriftServer.start();
+      LOG.info("Started Thrift API from Region Server.");
+    }
   }
 
   /**
@@ -685,6 +695,7 @@ public class HRegionServer implements HR
       }
     }
     // Run shutdown.
+    if (this.thriftServer != null) this.thriftServer.shutdown();
     this.leases.closeAfterLeasesExpire();
     this.rpcServer.stop();
     if (this.splitLogWorker != null) {

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java?rev=1186586&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java Thu Oct 20 00:42:57 2011
@@ -0,0 +1,229 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.thrift.ThriftServer;
+import org.apache.hadoop.hbase.thrift.ThriftUtilities;
+import org.apache.hadoop.hbase.thrift.generated.Hbase;
+import org.apache.hadoop.hbase.thrift.generated.IOError;
+import org.apache.hadoop.hbase.thrift.generated.TRowResult;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TNonblockingServerSocket;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TTransportFactory;
+
+/**
+ * HRegionThriftServer - this class starts up a Thrift server in the same
+ * JVM where the RegionServer is running. It inherits most of the
+ * functionality from the standard ThriftServer. This is good because
+ * we can maintain compatibility with applications that use the
+ * standard Thrift interface. For performance reasons, we can override
+ * methods to directly invoke calls into the HRegionServer and avoid the hop.
+ * <p>
+ * This can be enabled with <i>hbase.regionserver.export.thrift</i> set to true.
+ */
+public class HRegionThriftServer extends Thread {
+
+  public static final Log LOG = LogFactory.getLog(HRegionThriftServer.class);
+  public static final int DEFAULT_LISTEN_PORT = 9090;
+
+  private HRegionServer rs;
+  private Configuration conf;
+
+  private int port;
+  private boolean nonblocking;
+  private String bindIpAddress;
+  private String transport;
+  private String protocol;
+  volatile private TServer tserver;
+
+  /**
+   * Create an instance of the glue object that connects the
+   * RegionServer with the standard ThriftServer implementation
+   */
+  HRegionThriftServer(HRegionServer regionServer, Configuration conf) {
+    this.rs = regionServer;
+    this.conf = conf;
+  }
+
+  /**
+   * Inherit the Handler from the standard ThriftServer. This allows us
+   * to use the default implementation for most calls. We override certain calls
+   * for performance reasons
+   */
+  private class HBaseHandlerRegion extends ThriftServer.HBaseHandler {
+
+    HBaseHandlerRegion(final Configuration conf) throws IOException {
+      super(conf);
+      initialize(conf);
+    }
+
+    // TODO: Override more methods to short-circuit for performance
+
+    /**
+     * Get a record. Short-circuit to get better performance.
+     */
+    @Override
+    public List<TRowResult> getRowWithColumnsTs(ByteBuffer tableName,
+                                                ByteBuffer rowb,
+                                                List<ByteBuffer> columns,
+                                                long timestamp)
+      throws IOError {
+      try {
+        byte [] row = rowb.array();
+        HTable table = getTable(tableName.array());
+        HRegionLocation location = table.getRegionLocation(row);
+        byte[] regionName = location.getRegionInfo().getEncodedNameAsBytes();
+
+        if (columns == null) {
+          Get get = new Get(row);
+          get.setTimeRange(Long.MIN_VALUE, timestamp);
+          Result result = rs.get(regionName, get);
+          return ThriftUtilities.rowResultFromHBase(result);
+        }
+        ByteBuffer[] columnArr = columns.toArray(
+                                   new ByteBuffer[columns.size()]);
+        Get get = new Get(row);
+        for(ByteBuffer column : columnArr) {
+          byte [][] famAndQf = KeyValue.parseColumn(column.array());
+          if (famAndQf.length == 1) {
+            get.addFamily(famAndQf[0]);
+          } else {
+            get.addColumn(famAndQf[0], famAndQf[1]);
+          }
+        }
+        get.setTimeRange(Long.MIN_VALUE, timestamp);
+        Result result = rs.get(regionName, get);
+        return ThriftUtilities.rowResultFromHBase(result);
+      } catch (NotServingRegionException e) {
+        LOG.info("ThriftServer redirecting getRowWithColumnsTs");
+        return super.getRowWithColumnsTs(tableName, rowb, columns, timestamp);
+      } catch (IOException e) {
+        throw new IOError(e.getMessage());
+      }
+    }
+  }
+
+  /**
+   * Read and initialize config parameters
+   */
+  private void initialize(Configuration conf) {
+    this.port = conf.getInt("hbase.regionserver.thrift.port",
+                            DEFAULT_LISTEN_PORT);
+    this.bindIpAddress = conf.get("hbase.regionserver.thrift.ipaddress");
+    this.protocol = conf.get("hbase.regionserver.thrift.protocol");
+    this.transport = conf.get("hbase.regionserver.thrift.transport");
+    this.nonblocking = conf.getBoolean("hbase.regionserver.thrift.nonblocking",
+                                       false);
+  }
+
+  /**
+   * Stop ThriftServer
+   */
+  void shutdown() {
+    if (tserver != null) {
+      tserver.stop();
+      tserver = null;
+    }
+  }
+
+  @Override
+  public void run() {
+    try {
+      HBaseHandlerRegion handler = new HBaseHandlerRegion(this.conf);
+      Hbase.Processor processor = new Hbase.Processor(handler);
+
+      TProtocolFactory protocolFactory;
+      if (this.protocol != null && this.protocol.equals("compact")) {
+        protocolFactory = new TCompactProtocol.Factory();
+      } else {
+        protocolFactory = new TBinaryProtocol.Factory();
+      }
+
+      if (this.nonblocking) {
+        TNonblockingServerTransport serverTransport =
+          new TNonblockingServerSocket(this.port);
+        TFramedTransport.Factory transportFactory =
+          new TFramedTransport.Factory();
+
+        TNonblockingServer.Args serverArgs =
+          new TNonblockingServer.Args(serverTransport);
+        serverArgs.processor(processor);
+        serverArgs.transportFactory(transportFactory);
+        serverArgs.protocolFactory(protocolFactory);
+        LOG.info("starting HRegionServer Nonblocking Thrift server on " +
+            this.port);
+        LOG.info("HRegionServer Nonblocking Thrift server does not " +
+            "support address binding.");
+        tserver = new TNonblockingServer(serverArgs);
+      } else {
+        InetAddress listenAddress = null;
+        if (this.bindIpAddress != null) {
+          listenAddress = InetAddress.getByName(this.bindIpAddress);
+        } else {
+          listenAddress = InetAddress.getLocalHost();
+        }
+        TServerTransport serverTransport = new TServerSocket(
+           new InetSocketAddress(listenAddress, port));
+
+        TTransportFactory transportFactory;
+        if (this.transport != null && this.transport.equals("framed")) {
+          transportFactory = new TFramedTransport.Factory();
+        } else {
+          transportFactory = new TTransportFactory();
+        }
+
+        TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport);
+        serverArgs.processor(processor);
+        serverArgs.protocolFactory(protocolFactory);
+        serverArgs.transportFactory(transportFactory);
+        LOG.info("starting HRegionServer ThreadPool Thrift server on " +
+                 listenAddress + ":" + this.port);
+        tserver = new TThreadPoolServer(serverArgs);
+      }
+      tserver.serve();
+    } catch (Exception e) {
+      LOG.warn("Unable to start HRegionServerThrift interface.", e);
+    }
+  }
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java?rev=1186586&r1=1186585&r2=1186586&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java Thu Oct 20 00:42:57 2011
@@ -195,12 +195,12 @@ public class ThriftServer {
      * Constructs an HBaseHandler object.
      * @throws IOException
      */
-    HBaseHandler()
+    protected HBaseHandler()
     throws IOException {
       this(HBaseConfiguration.create());
     }
 
-    HBaseHandler(final Configuration c)
+    protected HBaseHandler(final Configuration c)
     throws IOException {
       this.conf = c;
       admin = new HBaseAdmin(conf);