You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 19:44:10 UTC

svn commit: r1181955 [1/3] - in /hbase/branches/0.89/src/main: java/org/apache/hadoop/hbase/regionserver/ java/org/apache/hadoop/hbase/thrift/ java/org/apache/hadoop/hbase/thrift/generated/ resources/org/apache/hadoop/hbase/thrift/

Author: nspiegelberg
Date: Tue Oct 11 17:44:09 2011
New Revision: 1181955

URL: http://svn.apache.org/viewvc?rev=1181955&view=rev
Log:
Commiting dgoode's patch: Wormhole Support in HBase89 Thrift

Summary:Added the single-hop regionserver from 90 to 89 and backported the necessary thrift calls. Regenerated the thrift files. Diff is a commit above D296169 although it can be separated, with the exception of thrift generation which must be redone.

Added forgotten shutdown, put after master-startup-wait to avoid errors.

Reviewed By:kranganathan

Test Plan:Chip will run his wormhole unit tests. I will also attempt to test some of these functionalities from WWW on my dev cluster.

Per Chip: it works! test passes!

Revert Plan:OK

- end platform impact -

Added:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java
Modified:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/thrift/generated/Hbase.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/thrift/generated/TRegionInfo.java
    hbase/branches/0.89/src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1181955&r1=1181954&r2=1181955&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Oct 11 17:44:09 2011
@@ -263,6 +263,9 @@ public class HRegionServer implements HR
 
   private final AtomicLong globalMemstoreSize = new AtomicLong(0);
 
+  // reference to the Thrift Server.
+  volatile private HRegionThriftServer thriftServer;
+
   /**
    * Starts a HRegionServer at the default location
    * @param conf
@@ -589,6 +592,10 @@ public class HRegionServer implements HR
         abort("Unhandled exception", t);
       }
     }
+    // shutdown thriftserver
+    if (thriftServer != null) {
+      thriftServer.shutdown();
+    }
     this.leases.closeAfterLeasesExpire();
     this.worker.stop();
     this.server.stop();
@@ -764,6 +771,14 @@ public class HRegionServer implements HR
       this.dynamicMetrics = RegionServerDynamicMetrics.newInstance();
       startServiceThreads();
       isOnline = true;
+
+      // Create the thread for the ThriftServer.
+      // NOTE this defaults to FALSE so you have to enable it in conf
+      if (conf.getBoolean("hbase.regionserver.export.thrift", false)) {
+        thriftServer = new HRegionThriftServer(this, conf);
+        thriftServer.start();
+        LOG.info("Started Thrift API from Region Server.");
+      }
     } catch (Throwable e) {
       this.isOnline = false;
       this.stopRequested.set(true);

Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java?rev=1181955&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionThriftServer.java Tue Oct 11 17:44:09 2011
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
+import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
+import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
+import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
+import org.apache.hadoop.hbase.thrift.generated.Hbase;
+import org.apache.hadoop.hbase.thrift.generated.IOError;
+import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
+import org.apache.hadoop.hbase.thrift.generated.Mutation;
+import org.apache.hadoop.hbase.thrift.generated.TCell;
+import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
+import org.apache.hadoop.hbase.thrift.generated.TRowResult;
+import org.apache.hadoop.hbase.thrift.ThriftServer;
+import org.apache.hadoop.hbase.thrift.ThriftServer.HBaseHandler;
+import org.apache.hadoop.hbase.thrift.ThriftUtilities;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.THsHaServer;
+import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TNonblockingServerSocket;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TTransportFactory;
+
+/**
+ * ThriftServer - this class starts up a Thrift server in the same
+ * JVM where the RegionServer is running. It inherits most of the
+ * functionality from the standard ThriftServer. This is good because
+ * we can maintain compatibility with applications that use the
+ * standard Thrift interface. For performance reasons, we override
+ * methods to directly invoke calls into the HRegionServer.
+ */
+public class HRegionThriftServer extends Thread {
+
+  public static final Log LOG = LogFactory.getLog(HRegionThriftServer.class);
+  public static final int DEFAULT_LISTEN_PORT = 9091;
+
+  private HRegionServer rs;
+  private Configuration conf;
+
+  private int port;
+  private boolean nonblocking;
+  private String bindIpAddress;
+  private String transport;
+  private String protocol;
+  volatile private TServer tserver;
+  private boolean redirect; // redirect to appropriate Regionserver
+
+  /**
+   * Create an instance of the glue object that connects the
+   * RegionServer with the standard ThriftServer implementation
+   */
+  HRegionThriftServer(HRegionServer regionServer, Configuration conf) {
+    this.rs = regionServer;
+    this.conf = conf;
+  }
+
+  /**
+   * Inherit the Handler from the standard ThriftServer. This allows us
+   * to use the default implementation for most calls. We override certain calls
+   * for performance reasons
+   */
+  private class HBaseHandlerRegion extends ThriftServer.HBaseHandler {
+
+    HBaseHandlerRegion(final Configuration conf) throws IOException {
+      super(conf);
+      initialize(conf);
+    }
+
+    /**
+     * Do increments. Shortcircuit to get better performance.
+     */
+    @Override
+    public long atomicIncrement(byte[] tableName, byte [] row, byte [] family,
+                                byte [] qualifier, long amount)
+      throws IOError, IllegalArgument, TException {
+      try {
+        HTable table = getTable(tableName);
+        HRegionLocation location = table.getRegionLocation(row);
+        byte[] regionName = location.getRegionInfo().getRegionName();
+
+        return rs.incrementColumnValue(regionName, row, family,
+                                       qualifier, amount, true);
+      } catch (NotServingRegionException e) {
+        if (!redirect) {
+          throw new IOError(e.getMessage());
+        }
+        LOG.info("ThriftServer redirecting atomicIncrement");
+        return super.atomicIncrement(tableName, row, family,
+                                     qualifier, amount);
+      } catch (IOException e) {
+        throw new IOError(e.getMessage());
+      }
+    }
+
+    /**
+     * Get a record. Shortcircuit to get better performance.
+     */
+    @Override
+    public List<TRowResult> getRowWithColumnsTs(byte[] tableName, byte[] row,
+                                                List<byte[]> columns,
+                                                long timestamp)
+      throws IOError {
+      try {
+        HTable table = getTable(tableName);
+        HRegionLocation location = table.getRegionLocation(row);
+        byte[] regionName = location.getRegionInfo().getRegionName();
+
+        if (columns == null) {
+          Get get = new Get(row);
+          get.setTimeRange(Long.MIN_VALUE, timestamp);
+          Result result = rs.get(regionName, get);
+          return ThriftUtilities.rowResultFromHBase(result);
+        }
+        byte[][] columnArr = columns.toArray(new byte[columns.size()][]);
+        Get get = new Get(row);
+        for (byte[] column : columnArr) {
+          byte[][] famAndQf = KeyValue.parseColumn(column);
+          if (famAndQf.length == 1) {
+            get.addFamily(famAndQf[0]);
+          } else {
+            get.addColumn(famAndQf[0], famAndQf[1]);
+          }
+        }
+        get.setTimeRange(Long.MIN_VALUE, timestamp);
+        Result result = rs.get(regionName, get);
+        return ThriftUtilities.rowResultFromHBase(result);
+      } catch (NotServingRegionException e) {
+        if (!redirect) {
+          throw new IOError(e.getMessage());
+        }
+        LOG.info("ThriftServer redirecting getRowWithColumnsTs");
+        return super.getRowWithColumnsTs(tableName, row, columns, timestamp);
+      } catch (IOException e) {
+        throw new IOError(e.getMessage());
+      }
+    }
+    @Override
+    public List<TRowResult> getRowWithColumnPrefix(byte[] tableName,
+        byte[] row, byte[] prefix) throws IOError {
+      return (getRowWithColumnPrefixTs(tableName, row, prefix,
+          HConstants.LATEST_TIMESTAMP));
+    }
+
+    @Override
+    public List<TRowResult> getRowWithColumnPrefixTs(byte[] tableName,
+        byte[] row, byte[] prefix, long timestamp) throws IOError {
+      try {
+        HTable table = getTable(tableName);
+        if (prefix == null) {
+          Get get = new Get(row);
+          get.setTimeRange(Long.MIN_VALUE, timestamp);
+          Result result = table.get(get);
+          return ThriftUtilities.rowResultFromHBase(result);
+        }
+        Get get = new Get(row);
+        byte[][] famAndPrefix = KeyValue.parseColumn(prefix);
+        if (famAndPrefix.length == 2) {
+          get.addFamily(famAndPrefix[0]);
+          get.setFilter(new ColumnPrefixFilter(famAndPrefix[1]));
+        } else {
+          get.setFilter(new ColumnPrefixFilter(famAndPrefix[0]));
+        }
+        get.setTimeRange(Long.MIN_VALUE, timestamp);
+        Result result = table.get(get);
+        return ThriftUtilities.rowResultFromHBase(result);
+      } catch (IOException e) {
+        throw new IOError(e.getMessage());
+      }
+    }
+
+  }
+
+  /**
+   * Read and initialize config parameters
+   */
+  private void initialize(Configuration conf) {
+    this.port = conf.getInt("hbase.regionserver.thrift.port",
+                            DEFAULT_LISTEN_PORT);
+    this.bindIpAddress = conf.get("hbase.regionserver.thrift.ipaddress");
+    this.protocol = conf.get("hbase.regionserver.thrift.protocol");
+    this.transport = conf.get("hbase.regionserver.thrift.transport");
+    this.nonblocking = conf.getBoolean("hbase.regionserver.thrift.nonblocking",
+                                       false);
+    this.redirect = conf.getBoolean("hbase.regionserver.thrift.redirect",
+                                       false);
+  }
+
+  /**
+   * Stop ThriftServer
+   */
+  void shutdown() {
+    if (tserver != null) {
+      tserver.stop();
+      tserver = null;
+    }
+  }
+
+  public void run() {
+    try {
+      HBaseHandlerRegion handler = new HBaseHandlerRegion(this.conf);
+      Hbase.Processor processor = new Hbase.Processor(handler);
+
+      TProtocolFactory protocolFactory;
+      if (this.protocol != null && this.protocol.equals("compact")) {
+        protocolFactory = new TCompactProtocol.Factory();
+      } else {
+        protocolFactory = new TBinaryProtocol.Factory();
+      }
+
+      if (this.nonblocking) {
+        LOG.info("starting HRegionServer Nonblocking Thrift server on " +
+                 this.port);
+        LOG.info("HRegionServer Nonblocking Thrift server does not " +
+                 "support address binding.");
+        TNonblockingServerTransport serverTransport =
+          new TNonblockingServerSocket(this.port);
+        TFramedTransport.Factory transportFactory =
+          new TFramedTransport.Factory();
+
+        tserver = new TNonblockingServer(processor, serverTransport,
+                                        transportFactory, protocolFactory);
+      } else {
+        InetAddress listenAddress = null;
+        if (this.bindIpAddress != null) {
+          listenAddress = InetAddress.getByName(this.bindIpAddress);
+        } else {
+          listenAddress = InetAddress.getLocalHost();
+        }
+        TServerTransport serverTransport = new TServerSocket(
+           new InetSocketAddress(listenAddress, port));
+
+        TTransportFactory transportFactory;
+        if (this.transport != null && this.transport.equals("framed")) {
+          transportFactory = new TFramedTransport.Factory();
+        } else {
+          transportFactory = new TTransportFactory();
+        }
+        LOG.info("starting HRegionServer ThreadPool Thrift server on " +
+                 listenAddress + ":" + this.port);
+        tserver = new TThreadPoolServer(processor, serverTransport,
+                                       transportFactory, protocolFactory);
+      }
+      tserver.serve();
+    } catch (Exception e) {
+      LOG.warn("Unable to start HRegionServerThrift interface.", e);
+    }
+  }
+}

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java?rev=1181955&r1=1181954&r2=1181955&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java Tue Oct 11 17:44:09 2011
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.client.Pu
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.PrefixFilter;
 import org.apache.hadoop.hbase.filter.WhileMatchFilter;
@@ -60,6 +61,7 @@ import org.apache.hadoop.hbase.thrift.ge
 import org.apache.hadoop.hbase.thrift.generated.TRowResult;
 import org.apache.hadoop.hbase.thrift.generated.TScan;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.protocol.TCompactProtocol;
@@ -186,10 +188,14 @@ public class ThriftServer {
     /**
      * Constructs an HBaseHandler object.
      *
-     * @throws MasterNotRunningException
+     * @throws IOException
      */
-    HBaseHandler() throws MasterNotRunningException {
-      conf = HBaseConfiguration.create();
+    protected HBaseHandler() throws IOException {
+      this(HBaseConfiguration.create());
+    }
+
+    protected HBaseHandler(final Configuration c) throws IOException {
+      this.conf = c;
       admin = new HBaseAdmin(conf);
       scannerMap = new HashMap<Integer, ResultScanner>();
     }
@@ -261,6 +267,12 @@ public class ThriftServer {
           region.id = regionInfo.getRegionId();
           region.name = regionInfo.getRegionName();
           region.version = regionInfo.getVersion();
+          HServerAddress server = regionsInfo.get(regionInfo);
+          if (server != null) {
+            byte[] hostname = Bytes.toBytes(server.getHostname());
+            region.serverName = hostname;
+            region.port = server.getPort();
+          }
           regions.add(region);
         }
         return regions;
@@ -393,6 +405,40 @@ public class ThriftServer {
       }
     }
 
+    @Override
+    public List<TRowResult> getRowWithColumnPrefix(byte[] tableName,
+                                                   byte[] row, byte[] prefix) throws IOError {
+      return (getRowWithColumnPrefixTs(tableName, row, prefix,
+                                       HConstants.LATEST_TIMESTAMP));
+    }
+
+    @Override
+    public List<TRowResult> getRowWithColumnPrefixTs(byte[] tableName,
+                                                     byte[] row, byte[] prefix, long timestamp) throws IOError {
+      try {
+        HTable table = getTable(tableName);
+        if (prefix == null) {
+          Get get = new Get(row);
+          get.setTimeRange(Long.MIN_VALUE, timestamp);
+          Result result = table.get(get);
+          return ThriftUtilities.rowResultFromHBase(result);
+        }
+        Get get = new Get(row);
+        byte [][] famAndPrefix = KeyValue.parseColumn(prefix);
+        if (famAndPrefix.length == 2) {
+          get.addFamily(famAndPrefix[0]);
+          get.setFilter(new ColumnPrefixFilter(famAndPrefix[1]));
+        } else {
+          get.setFilter(new ColumnPrefixFilter(famAndPrefix[0]));
+        }
+        get.setTimeRange(Long.MIN_VALUE, timestamp);
+        Result result = table.get(get);
+        return ThriftUtilities.rowResultFromHBase(result);
+      } catch (IOException e) {
+        throw new IOError(e.getMessage());
+      }
+    }
+
     public void deleteAll(byte[] tableName, byte[] row, byte[] column)
         throws IOError {
       deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP);
@@ -944,6 +990,51 @@ public class ThriftServer {
         throw new IOError(e.getMessage());
       }
     }
+
+    @Override
+    public TRegionInfo getRegionInfo(byte[] searchRow) throws IOError,
+        TException {
+      try {
+        HTable table = getTable(HConstants.META_TABLE_NAME);
+        Result startRowResult = table.getRowOrBefore(searchRow,
+            HConstants.CATALOG_FAMILY);
+
+        if (startRowResult == null) {
+          throw new IOException("Cannot find row in .META., row="
+              + Bytes.toString(searchRow));
+        }
+
+        // find region start and end keys
+        byte[] value = startRowResult.getValue(HConstants.CATALOG_FAMILY,
+            HConstants.REGIONINFO_QUALIFIER);
+        if (value == null || value.length == 0) {
+          throw new IOException("HRegionInfo REGIONINFO was null or "
+              + " empty in Meta for row=" + Bytes.toString(searchRow));
+        }
+        HRegionInfo regionInfo = Writables.getHRegionInfo(value);
+        TRegionInfo region = new TRegionInfo();
+        region.setStartKey(regionInfo.getStartKey());
+        region.setEndKey(regionInfo.getEndKey());
+        region.id = regionInfo.getRegionId();
+        region.setName(regionInfo.getRegionName());
+        region.version = regionInfo.getVersion();
+
+        // find region assignment to server
+        value = startRowResult.getValue(HConstants.CATALOG_FAMILY,
+            HConstants.SERVER_QUALIFIER);
+        if (value != null && value.length > 0) {
+          String address = Bytes.toString(value);
+          HServerAddress server = new HServerAddress(address);
+          byte[] hostname = Bytes.toBytes(server.getHostname());
+          region.serverName = hostname;
+          region.port = server.getPort();
+        }
+        return region;
+      } catch (IOException e) {
+        throw new IOError(e.getMessage());
+      }
+    }
+
   }
 
   //