You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2007/12/30 23:22:18 UTC

svn commit: r607602 - in /lucene/hadoop/trunk/src/contrib/hbase: ./ conf/ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/io/ src/java/org/apache/hadoop/hbase/ipc/ src/test/org/apache/hadoop/hbase/io/

Author: stack
Date: Sun Dec 30 14:22:16 2007
New Revision: 607602

URL: http://svn.apache.org/viewvc?rev=607602&view=rev
Log:
HADOOP-2495 inor performance improvements: Slim-down BatchOperation, etc.

Added:
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/ipc/
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java
Modified:
    lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/hbase/conf/hbase-default.xml
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnectionManager.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterInterface.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchOperation.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/TextSequence.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/io/TestTextSequence.java

Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=607602&r1=607601&r2=607602&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Sun Dec 30 14:22:16 2007
@@ -22,6 +22,7 @@
    HADOOP-2479 Save on number of Text object creations
    HADOOP-2485 Make mapfile index interval configurable (Set default to 32
                instead of 128)
+   HADOOP-2495 Minor performance improvements: Slim-down BatchOperation, etc. 
 
   BUG FIXES
    HADOOP-2059 In tests, exceptions in min dfs shutdown should not fail test

Modified: lucene/hadoop/trunk/src/contrib/hbase/conf/hbase-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/conf/hbase-default.xml?rev=607602&r1=607601&r2=607602&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/conf/hbase-default.xml (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/conf/hbase-default.xml Sun Dec 30 14:22:16 2007
@@ -211,6 +211,13 @@
     skip every nth index member when reading back the index into memory.
     </description>
   </property>
+  <property>
+    <name>hbase.io.seqfile.compression.type</name>
+    <value>NONE</value>
+    <description>The compression type for hbase sequencefile.Writers
+    such as hlog.
+    </description>
+  </property>
 
   <!-- HbaseShell Configurations -->
   <property>

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnectionManager.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnectionManager.java?rev=607602&r1=607601&r2=607602&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnectionManager.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HConnectionManager.java Sun Dec 30 14:22:16 2007
@@ -34,11 +34,11 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.ipc.HbaseRPC;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 
 /**
@@ -177,7 +177,7 @@
               MASTER_ADDRESS, DEFAULT_MASTER_ADDRESS));
 
           try {
-            HMasterInterface tryMaster = (HMasterInterface)RPC.getProxy(
+            HMasterInterface tryMaster = (HMasterInterface)HbaseRPC.getProxy(
                 HMasterInterface.class, HMasterInterface.versionID, 
                 masterLocation.getInetSocketAddress(), this.conf);
             
@@ -360,13 +360,11 @@
           try {
             versionId =
               serverInterfaceClass.getDeclaredField("versionID").getLong(server);
-
           } catch (IllegalAccessException e) {
             // Should never happen unless visibility of versionID changes
             throw new UnsupportedOperationException(
                 "Unable to open a connection to a " +
                 serverInterfaceClass.getName() + " server.", e);
-
           } catch (NoSuchFieldException e) {
             // Should never happen unless versionID field name changes in HRegionInterface
             throw new UnsupportedOperationException(
@@ -375,13 +373,11 @@
           }
 
           try {
-            server = (HRegionInterface) RPC.waitForProxy(serverInterfaceClass,
+            server = (HRegionInterface)HbaseRPC.waitForProxy(serverInterfaceClass,
                 versionId, regionServer.getInetSocketAddress(), this.conf);
-
           } catch (RemoteException e) {
             throw RemoteExceptionHandler.decodeRemoteException(e);
           }
-
           this.servers.put(regionServer.toString(), server);
         }
       }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java?rev=607602&r1=607601&r2=607602&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java Sun Dec 30 14:22:16 2007
@@ -40,6 +40,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.SequenceFile.Reader;
 
 /**
@@ -177,7 +178,7 @@
                   "; map content " + logWriters.toString());
               }
               w = SequenceFile.createWriter(fs, conf, logfile, HLogKey.class,
-                HLogEdit.class);
+                HLogEdit.class, getCompressionType(conf));
               // Use copy of regionName; regionName object is reused inside in
               // HStoreKey.getRegionName so its content changes as we iterate.
               logWriters.put(new Text(regionName), w);
@@ -238,6 +239,16 @@
     fs.mkdirs(dir);
     rollWriter();
   }
+  
+  /**
+   * Get the compression type for the hlog files.
+   * @param c Configuration to use.
+   * @return the kind of compression to use
+   */
+  private static CompressionType getCompressionType(final Configuration c) {
+    String name = c.get("hbase.io.seqfile.compression.type");
+    return name == null? CompressionType.NONE: CompressionType.valueOf(name);
+  }
 
   /**
    * Called by HRegionServer when it opens a new region to ensure that log
@@ -298,7 +309,7 @@
         }
         Path newPath = computeFilename(filenum++);
         this.writer = SequenceFile.createWriter(this.fs, this.conf, newPath,
-            HLogKey.class, HLogEdit.class);
+            HLogKey.class, HLogEdit.class, getCompressionType(this.conf));
         LOG.info("new log writer created at " + newPath);
 
         // Can we delete any of the old log files?

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java?rev=607602&r1=607601&r2=607602&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Sun Dec 30 14:22:16 2007
@@ -51,6 +51,7 @@
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.ipc.HbaseRPC;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.InfoServer;
 import org.apache.hadoop.hbase.util.Sleeper;
@@ -59,7 +60,6 @@
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
 
@@ -919,7 +919,7 @@
     this.serverLeases = new Leases(this.leaseTimeout, 
         conf.getInt("hbase.master.lease.thread.wakefrequency", 15 * 1000));
     
-    this.server = RPC.getServer(this, address.getBindAddress(),
+    this.server = HbaseRPC.getServer(this, address.getBindAddress(),
         address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
         false, conf);
 

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterInterface.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterInterface.java?rev=607602&r1=607601&r2=607602&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterInterface.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterInterface.java Sun Dec 30 14:22:16 2007
@@ -30,8 +30,11 @@
  * tables.
  */
 public interface HMasterInterface extends VersionedProtocol {
-  /** Interface version */
-  public static final long versionID = 1L;
+  /**
+   * Interface version.
+   * Version was incremented to 2 when we brought the hadoop RPC local to hbase.
+   */
+  public static final long versionID = 2L;
 
   /** @return true if master is available */
   public boolean isMasterRunning();

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java?rev=607602&r1=607601&r2=607602&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Sun Dec 30 14:22:16 2007
@@ -1155,15 +1155,12 @@
       for (BatchOperation op: b) {
         HStoreKey key = new HStoreKey(row, op.getColumn(), commitTime);
         byte[] val = null;
-        switch(op.getOp()) {
-        case PUT:
+        if (op.isPut()) {
           val = op.getValue();
           if (HLogEdit.isDeleted(val)) {
             throw new IOException("Cannot insert value: " + val);
           }
-          break;
-
-        case DELETE:
+        } else {
           if (timestamp == LATEST_TIMESTAMP) {
             // Save off these deletes
             if (deletes == null) {
@@ -1173,7 +1170,6 @@
           } else {
             val = HLogEdit.deleteBytes.get();
           }
-          break;
         }
         if (val != null) {
           localput(lockid, key, val);

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?rev=607602&r1=607601&r2=607602&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Sun Dec 30 14:22:16 2007
@@ -54,6 +54,7 @@
 import org.apache.hadoop.hbase.filter.RowFilterInterface;
 import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.ipc.HbaseRPC;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.InfoServer;
 import org.apache.hadoop.hbase.util.Sleeper;
@@ -62,7 +63,6 @@
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.util.StringUtils;
@@ -661,7 +661,7 @@
     this.workerThread = new Thread(worker);
     this.sleeper = new Sleeper(this.msgInterval, this.stopRequested);
     // Server to handle client requests
-    this.server = RPC.getServer(this, address.getBindAddress(), 
+    this.server = HbaseRPC.getServer(this, address.getBindAddress(), 
       address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
       false, conf);
     this.serverInfo = new HServerInfo(new HServerAddress(
@@ -1060,7 +1060,7 @@
       LOG.debug("Telling master we are up");
     }
     // Do initial RPC setup.
-    this.hbaseMaster = (HMasterRegionInterface)RPC.waitForProxy(
+    this.hbaseMaster = (HMasterRegionInterface)HbaseRPC.waitForProxy(
       HMasterRegionInterface.class, HMasterRegionInterface.versionID,
       new HServerAddress(conf.get(MASTER_ADDRESS)).getInetSocketAddress(),
       this.conf);

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java?rev=607602&r1=607601&r2=607602&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStoreKey.java Sun Dec 30 14:22:16 2007
@@ -23,6 +23,7 @@
 import org.apache.hadoop.io.*;
 
 import java.io.*;
+import java.nio.ByteBuffer;
 
 /**
  * A Key for a stored row
@@ -225,7 +226,7 @@
   // Comparable
 
   public int compareTo(Object o) {
-    HStoreKey other = (HStoreKey) o;
+    HStoreKey other = (HStoreKey)o;
     int result = this.row.compareTo(other.row);
     if (result != 0) {
       return result;
@@ -322,9 +323,11 @@
   private static int getColonOffset(final Text col)
   throws InvalidColumnNameException {
     int offset = -1;
-    for (int i = 0; i < col.getLength(); i++) {
-      if (col.charAt(i) == COLUMN_FAMILY_DELIMITER) {
-        offset = i;
+    ByteBuffer bb = ByteBuffer.wrap(col.getBytes());
+    for (int lastPosition = bb.position(); bb.hasRemaining();
+        lastPosition = bb.position()) {
+      if (Text.bytesToCodePoint(bb) == COLUMN_FAMILY_DELIMITER) {
+        offset = lastPosition;
         break;
       }
     }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchOperation.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchOperation.java?rev=607602&r1=607601&r2=607602&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchOperation.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchOperation.java Sun Dec 30 14:22:16 2007
@@ -27,56 +27,38 @@
 import org.apache.hadoop.io.Writable;
 
 /**
- * Batch update operations such as put, delete, and deleteAll.
+ * Batch update operation.
+ * 
+ * If value is null, its a DELETE operation.  If its non-null, its a PUT.
+ * This object is purposely bare-bones because many instances are created
+ * during bulk uploads.  We have one class for DELETEs and PUTs rather than
+ * a class per type because it makes the serialization easier.
+ * @see BatchUpdate 
  */
 public class BatchOperation implements Writable {
-  /** 
-   * Operation types.
-   * @see org.apache.hadoop.io.SequenceFile.Writer
-   */
-  public static enum Operation {
-    /** update a field */
-    PUT,
-    /** delete a field */
-    DELETE}
-
-  private Operation op;
   private Text column;
-  private byte[] value;
   
-  /** default constructor used by Writable */
+  // A null value defines DELETE operations.
+  private byte[] value;
+
+  /** Default constructor used by Writable */
   public BatchOperation() {
     this(new Text());
   }
   /**
-   * Creates a DELETE operation
-   * 
+   * Creates a DELETE batch operation.
    * @param column column name
    */
   public BatchOperation(final Text column) {
-    this(Operation.DELETE, column, null);
+    this(column, null);
   }
 
   /**
-   * Creates a PUT operation
-   * 
+   * Create a batch operation.
    * @param column column name
-   * @param value column value
+   * @param value column value.  If non-null, this is a PUT operation.
    */
   public BatchOperation(final Text column, final byte [] value) {
-    this(Operation.PUT, column, value);
-  }
-  
-  /**
-   * Creates a put operation
-   *
-   * @param operation the operation (put or get)
-   * @param column column name
-   * @param value column value
-   */
-  public BatchOperation(final Operation operation, final Text column,
-      final byte[] value) {
-    this.op = operation;
     this.column = column;
     this.value = value;
   }
@@ -85,47 +67,42 @@
    * @return the column
    */
   public Text getColumn() {
-    return column;
+    return this.column;
   }
 
   /**
-   * @return the operation
+   * @return the value
    */
-  public Operation getOp() {
-    return this.op;
+  public byte[] getValue() {
+    return this.value;
   }
 
   /**
-   * @return the value
+   * @return True if this is a PUT operation (this.value is not null).
    */
-  public byte[] getValue() {
-    return value;
+  public boolean isPut() {
+    return this.value != null;
   }
   
-  //
-  // Writable
-  //
+  // Writable methods
 
-  /**
-   * {@inheritDoc}
-   */
-  public void readFields(DataInput in) throws IOException {
-    int ordinal = in.readInt();
-    this.op = Operation.values()[ordinal];
-    column.readFields(in);
-    if (this.op == Operation.PUT) {
-      value = new byte[in.readInt()];
-      in.readFully(value);
+  // This is a hotspot when updating deserializing incoming client submissions.
+  // In Performance Evaluation sequentialWrite, 70% of object allocations are
+  // done in here.
+  public void readFields(final DataInput in) throws IOException {
+    this.column.readFields(in);
+    // Is there a value to read?
+    if (in.readBoolean()) {
+      this.value = new byte[in.readInt()];
+      in.readFully(this.value);
     }
   }
 
-  /**
-   * {@inheritDoc}
-   */
-  public void write(DataOutput out) throws IOException {
-    out.writeInt(this.op.ordinal());
-    column.write(out);
-    if (this.op == Operation.PUT) {
+  public void write(final DataOutput out) throws IOException {
+    this.column.write(out);
+    boolean p = isPut();
+    out.writeBoolean(p);
+    if (p) {
       out.writeInt(value.length);
       out.write(value);
     }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java?rev=607602&r1=607601&r2=607602&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/BatchUpdate.java Sun Dec 30 14:22:16 2007
@@ -99,13 +99,17 @@
    *
    * @param lid lock id returned from startUpdate
    * @param column column whose value is being set
-   * @param val new value for column
+   * @param val new value for column.  Cannot be null (can be empty).
    */
   public synchronized void put(final long lid, final Text column,
       final byte val[]) {
     if(this.lockid != lid) {
       throw new IllegalArgumentException("invalid lockid " + lid);
     }
+    if (val == null) {
+      // If null, the PUT becomes a DELETE operation.
+      throw new IllegalArgumentException("Passed value cannot be null");
+    }
     operations.add(new BatchOperation(column, val));
   }
   
@@ -138,10 +142,7 @@
   // Writable
   //
 
-  /**
-   * {@inheritDoc}
-   */
-  public void readFields(DataInput in) throws IOException {
+  public void readFields(final DataInput in) throws IOException {
     row.readFields(in);
     int nOps = in.readInt();
     for (int i = 0; i < nOps; i++) {
@@ -151,14 +152,11 @@
     }
   }
 
-  /**
-   * {@inheritDoc}
-   */
-  public void write(DataOutput out) throws IOException {
+  public void write(final DataOutput out) throws IOException {
     row.write(out);
     out.writeInt(operations.size());
     for (BatchOperation op: operations) {
       op.write(out);
     }
   }
-}
+}
\ No newline at end of file

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java?rev=607602&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/HbaseObjectWritable.java Sun Dec 30 14:22:16 2007
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io;
+
+import java.lang.reflect.Array;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+
+/** A polymorphic Writable that writes an instance with it's class name.
+ * Handles arrays, strings and primitive types without a Writable wrapper.
+ * 
+ * This is a copy of the hadoop version renamed.  Removes UTF8 (HADOOP-414).
+ * Using Text intead of UTF-8 saves ~2% CPU between reading and writing objects
+ * running a short sequentialWrite Performance Evaluation test just in
+ * ObjectWritable alone; more when we're doing randomRead-ing.  Other
+ * optimizations include our passing codes for classes instead of the
+ * actual class names themselves.
+ * 
+ * <p>Has other optimizations passing codes instead of class names.
+ */
+public class HbaseObjectWritable implements Writable, Configurable {
+
+  private Class declaredClass;
+  private Object instance;
+  private Configuration conf;
+
+  public HbaseObjectWritable() {}
+  
+  public HbaseObjectWritable(Object instance) {
+    set(instance);
+  }
+
+  public HbaseObjectWritable(Class declaredClass, Object instance) {
+    this.declaredClass = declaredClass;
+    this.instance = instance;
+  }
+
+  /** Return the instance, or null if none. */
+  public Object get() { return instance; }
+  
+  /** Return the class this is meant to be. */
+  public Class getDeclaredClass() { return declaredClass; }
+  
+  /** Reset the instance. */
+  public void set(Object instance) {
+    this.declaredClass = instance.getClass();
+    this.instance = instance;
+  }
+  
+  public String toString() {
+    return "OW[class=" + declaredClass + ",value=" + instance + "]";
+  }
+
+  
+  public void readFields(DataInput in) throws IOException {
+    readObject(in, this, this.conf);
+  }
+  
+  public void write(DataOutput out) throws IOException {
+    writeObject(out, instance, declaredClass, conf);
+  }
+
+  private static final Map<String, Class<?>> PRIMITIVE_NAMES = new HashMap<String, Class<?>>();
+  static {
+    PRIMITIVE_NAMES.put("boolean", Boolean.TYPE);
+    PRIMITIVE_NAMES.put("byte", Byte.TYPE);
+    PRIMITIVE_NAMES.put("char", Character.TYPE);
+    PRIMITIVE_NAMES.put("short", Short.TYPE);
+    PRIMITIVE_NAMES.put("int", Integer.TYPE);
+    PRIMITIVE_NAMES.put("long", Long.TYPE);
+    PRIMITIVE_NAMES.put("float", Float.TYPE);
+    PRIMITIVE_NAMES.put("double", Double.TYPE);
+    PRIMITIVE_NAMES.put("void", Void.TYPE);
+  }
+
+  private static class NullInstance extends Configured implements Writable {
+    private Class<?> declaredClass;
+    public NullInstance() { super(null); }
+    public NullInstance(Class declaredClass, Configuration conf) {
+      super(conf);
+      this.declaredClass = declaredClass;
+    }
+    public void readFields(DataInput in) throws IOException {
+      String className = Text.readString(in);
+      declaredClass = PRIMITIVE_NAMES.get(className);
+      if (declaredClass == null) {
+        try {
+          declaredClass = getConf().getClassByName(className);
+        } catch (ClassNotFoundException e) {
+          throw new RuntimeException(e.toString());
+        }
+      }
+    }
+    public void write(DataOutput out) throws IOException {
+      Text.writeString(out, declaredClass.getName());
+    }
+  }
+
+  /** Write a {@link Writable}, {@link String}, primitive type, or an array of
+   * the preceding. */
+  public static void writeObject(DataOutput out, Object instance,
+                                 Class declaredClass, 
+                                 Configuration conf) throws IOException {
+
+    if (instance == null) {                       // null
+      instance = new NullInstance(declaredClass, conf);
+      declaredClass = Writable.class;
+    }
+
+    Text.writeString(out, declaredClass.getName()); // always write declared
+
+    if (declaredClass.isArray()) {                // array
+      int length = Array.getLength(instance);
+      out.writeInt(length);
+      for (int i = 0; i < length; i++) {
+        writeObject(out, Array.get(instance, i),
+                    declaredClass.getComponentType(), conf);
+      }
+      
+    } else if (declaredClass == String.class) {   // String
+      Text.writeString(out, (String)instance);
+      
+    } else if (declaredClass.isPrimitive()) {     // primitive type
+
+      if (declaredClass == Boolean.TYPE) {        // boolean
+        out.writeBoolean(((Boolean)instance).booleanValue());
+      } else if (declaredClass == Character.TYPE) { // char
+        out.writeChar(((Character)instance).charValue());
+      } else if (declaredClass == Byte.TYPE) {    // byte
+        out.writeByte(((Byte)instance).byteValue());
+      } else if (declaredClass == Short.TYPE) {   // short
+        out.writeShort(((Short)instance).shortValue());
+      } else if (declaredClass == Integer.TYPE) { // int
+        out.writeInt(((Integer)instance).intValue());
+      } else if (declaredClass == Long.TYPE) {    // long
+        out.writeLong(((Long)instance).longValue());
+      } else if (declaredClass == Float.TYPE) {   // float
+        out.writeFloat(((Float)instance).floatValue());
+      } else if (declaredClass == Double.TYPE) {  // double
+        out.writeDouble(((Double)instance).doubleValue());
+      } else if (declaredClass == Void.TYPE) {    // void
+      } else {
+        throw new IllegalArgumentException("Not a primitive: "+declaredClass);
+      }
+    } else if (declaredClass.isEnum()) {         // enum
+      Text.writeString(out, ((Enum)instance).name());
+    } else if (Writable.class.isAssignableFrom(declaredClass)) { // Writable
+      Text.writeString(out, instance.getClass().getName());
+      ((Writable)instance).write(out);
+
+    } else {
+      throw new IOException("Can't write: "+instance+" as "+declaredClass);
+    }
+  }
+  
+  
+  /** Read a {@link Writable}, {@link String}, primitive type, or an array of
+   * the preceding. */
+  public static Object readObject(DataInput in, Configuration conf)
+    throws IOException {
+    return readObject(in, null, conf);
+  }
+    
+  /** Read a {@link Writable}, {@link String}, primitive type, or an array of
+   * the preceding. */
+  @SuppressWarnings("unchecked")
+  public static Object readObject(DataInput in, HbaseObjectWritable objectWritable, Configuration conf)
+    throws IOException {
+    String className = Text.readString(in);
+    Class<?> declaredClass = PRIMITIVE_NAMES.get(className);
+    if (declaredClass == null) {
+      try {
+        declaredClass = conf.getClassByName(className);
+      } catch (ClassNotFoundException e) {
+        throw new RuntimeException("readObject can't find class", e);
+      }
+    }    
+
+    Object instance;
+    
+    if (declaredClass.isPrimitive()) {            // primitive types
+
+      if (declaredClass == Boolean.TYPE) {             // boolean
+        instance = Boolean.valueOf(in.readBoolean());
+      } else if (declaredClass == Character.TYPE) {    // char
+        instance = Character.valueOf(in.readChar());
+      } else if (declaredClass == Byte.TYPE) {         // byte
+        instance = Byte.valueOf(in.readByte());
+      } else if (declaredClass == Short.TYPE) {        // short
+        instance = Short.valueOf(in.readShort());
+      } else if (declaredClass == Integer.TYPE) {      // int
+        instance = Integer.valueOf(in.readInt());
+      } else if (declaredClass == Long.TYPE) {         // long
+        instance = Long.valueOf(in.readLong());
+      } else if (declaredClass == Float.TYPE) {        // float
+        instance = Float.valueOf(in.readFloat());
+      } else if (declaredClass == Double.TYPE) {       // double
+        instance = Double.valueOf(in.readDouble());
+      } else if (declaredClass == Void.TYPE) {         // void
+        instance = null;
+      } else {
+        throw new IllegalArgumentException("Not a primitive: "+declaredClass);
+      }
+
+    } else if (declaredClass.isArray()) {              // array
+      int length = in.readInt();
+      instance = Array.newInstance(declaredClass.getComponentType(), length);
+      for (int i = 0; i < length; i++) {
+        Array.set(instance, i, readObject(in, conf));
+      }
+      
+    } else if (declaredClass == String.class) {        // String
+      instance = Text.readString(in);
+    } else if (declaredClass.isEnum()) {         // enum
+      instance = Enum.valueOf((Class<? extends Enum>) declaredClass, Text.readString(in));
+    } else {                                      // Writable
+      Class instanceClass = null;
+      try {
+        instanceClass = conf.getClassByName(Text.readString(in));
+      } catch (ClassNotFoundException e) {
+        throw new RuntimeException("readObject can't find class", e);
+      }
+      
+      Writable writable = WritableFactories.newInstance(instanceClass, conf);
+      writable.readFields(in);
+      instance = writable;
+
+      if (instanceClass == NullInstance.class) {  // null
+        declaredClass = ((NullInstance)instance).declaredClass;
+        instance = null;
+      }
+    }
+
+    if (objectWritable != null) {                 // store values
+      objectWritable.declaredClass = declaredClass;
+      objectWritable.instance = instance;
+    }
+
+    return instance;
+      
+  }
+
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  public Configuration getConf() {
+    return this.conf;
+  }
+  
+}

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/TextSequence.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/TextSequence.java?rev=607602&r1=607601&r2=607602&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/TextSequence.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/io/TextSequence.java Sun Dec 30 14:22:16 2007
@@ -127,8 +127,9 @@
 
   public int hashCode() {
     int hash = 1;
-    for (int i = this.start; i < getLength(); i++)
-      hash = (31 * hash) + this.delegatee.getBytes()[i];
+    byte [] b = this.delegatee.getBytes();
+    for (int i = this.start, length = getLength(); i < length; i++)
+      hash = (31 * hash) + b[i];
     return hash;
   }
 

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java?rev=607602&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/ipc/HbaseRPC.java Sun Dec 30 14:22:16 2007
@@ -0,0 +1,442 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.ipc;
+
+import java.lang.reflect.Proxy;
+import java.lang.reflect.Method;
+import java.lang.reflect.Array;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.SocketTimeoutException;
+import java.io.*;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Collection;
+
+import javax.net.SocketFactory;
+
+import org.apache.commons.logging.*;
+
+import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.conf.*;
+
+/** A simple RPC mechanism.
+ * 
+ * This is a local hbase copy of the hadoop RPC so we can do things like
+ * address HADOOP-414 for hbase-only and try other hbase-specific
+ * optimizations like using our own version of ObjectWritable.  Class has been
+ * renamed to avoid confusing it w/ hadoop versions.
+ * 
+ * <p>Below are continued the class comments from hadoop RPC class.
+ *
+ * A <i>protocol</i> is a Java interface.  All parameters and return types must
+ * be one of:
+ *
+ * <ul> <li>a primitive type, <code>boolean</code>, <code>byte</code>,
+ * <code>char</code>, <code>short</code>, <code>int</code>, <code>long</code>,
+ * <code>float</code>, <code>double</code>, or <code>void</code>; or</li>
+ *
+ * <li>a {@link String}; or</li>
+ *
+ * <li>a {@link Writable}; or</li>
+ *
+ * <li>an array of the above types</li> </ul>
+ *
+ * All methods in the protocol should throw only IOException.  No field data of
+ * the protocol instance is transmitted.
+ * 
+ * @see org.apache.hadoop.ipc.RPC
+ */
+public class HbaseRPC {
+  private static final Log LOG =
+    LogFactory.getLog("org.apache.hadoop.ipc.RPC");
+
+  private HbaseRPC() {}                                  // no public ctor
+
+
+  /** A method invocation, including the method name and its parameters.*/
+  private static class Invocation implements Writable, Configurable {
+    private String methodName;
+    private Class[] parameterClasses;
+    private Object[] parameters;
+    private Configuration conf;
+
+    public Invocation() {}
+
+    public Invocation(Method method, Object[] parameters) {
+      this.methodName = method.getName();
+      this.parameterClasses = method.getParameterTypes();
+      this.parameters = parameters;
+    }
+
+    /** The name of the method invoked. */
+    public String getMethodName() { return methodName; }
+
+    /** The parameter classes. */
+    public Class[] getParameterClasses() { return parameterClasses; }
+
+    /** The parameter instances. */
+    public Object[] getParameters() { return parameters; }
+
+    public void readFields(DataInput in) throws IOException {
+      methodName = Text.readString(in);
+      parameters = new Object[in.readInt()];
+      parameterClasses = new Class[parameters.length];
+      HbaseObjectWritable objectWritable = new HbaseObjectWritable();
+      for (int i = 0; i < parameters.length; i++) {
+        parameters[i] = HbaseObjectWritable.readObject(in, objectWritable, this.conf);
+        parameterClasses[i] = objectWritable.getDeclaredClass();
+      }
+    }
+
+    public void write(DataOutput out) throws IOException {
+      Text.writeString(out, methodName);
+      out.writeInt(parameterClasses.length);
+      for (int i = 0; i < parameterClasses.length; i++) {
+        HbaseObjectWritable.writeObject(out, parameters[i], parameterClasses[i],
+                                   conf);
+      }
+    }
+
+    public String toString() {
+      StringBuffer buffer = new StringBuffer();
+      buffer.append(methodName);
+      buffer.append("(");
+      for (int i = 0; i < parameters.length; i++) {
+        if (i != 0)
+          buffer.append(", ");
+        buffer.append(parameters[i]);
+      }
+      buffer.append(")");
+      return buffer.toString();
+    }
+
+    public void setConf(Configuration conf) {
+      this.conf = conf;
+    }
+
+    public Configuration getConf() {
+      return this.conf;
+    }
+
+  }
+
+  private static Map<SocketFactory, Client> CLIENTS =
+      new HashMap<SocketFactory, Client>();
+
+  private static synchronized Client getClient(Configuration conf,
+      SocketFactory factory) {
+    // Construct & cache client.  The configuration is only used for timeout,
+    // and Clients have connection pools.  So we can either (a) lose some
+    // connection pooling and leak sockets, or (b) use the same timeout for all
+    // configurations.  Since the IPC is usually intended globally, not
+    // per-job, we choose (a).
+    Client client = CLIENTS.get(factory);
+    if (client == null) {
+      client = new Client(HbaseObjectWritable.class, conf, factory);
+      CLIENTS.put(factory, client);
+    }
+    return client;
+  }
+  
+  /**
+   * Construct & cache client with the default SocketFactory.
+   * @param conf
+   * @return
+   */
+  private static Client getClient(Configuration conf) {
+    return getClient(conf, SocketFactory.getDefault());
+  }
+
+  /**
+   * Stop all RPC client connections
+   */
+  public static synchronized void stopClient(){
+    for (Client client : CLIENTS.values())
+      client.stop();
+    CLIENTS.clear();
+  }
+
+  /*
+   * remove specified client from the list of clients.
+   */
+  static synchronized void removeClients() {
+    CLIENTS.clear();
+  }
+
+  static synchronized Collection allClients() {
+    return CLIENTS.values();
+  }
+
+  private static class Invoker implements InvocationHandler {
+    private InetSocketAddress address;
+    private UserGroupInformation ticket;
+    private Client client;
+
+    public Invoker(InetSocketAddress address, UserGroupInformation ticket, 
+                   Configuration conf, SocketFactory factory) {
+      this.address = address;
+      this.ticket = ticket;
+      this.client = getClient(conf, factory);
+    }
+
+    public Object invoke(Object proxy, Method method, Object[] args)
+      throws Throwable {
+      long startTime = System.currentTimeMillis();
+      HbaseObjectWritable value = (HbaseObjectWritable)
+        client.call(new Invocation(method, args), address, ticket);
+      long callTime = System.currentTimeMillis() - startTime;
+      LOG.debug("Call: " + method.getName() + " " + callTime);
+      return value.get();
+    }
+  }
+
+  /**
+   * A version mismatch for the RPC protocol.
+   */
+  public static class VersionMismatch extends IOException {
+    private String interfaceName;
+    private long clientVersion;
+    private long serverVersion;
+    
+    /**
+     * Create a version mismatch exception
+     * @param interfaceName the name of the protocol mismatch
+     * @param clientVersion the client's version of the protocol
+     * @param serverVersion the server's version of the protocol
+     */
+    public VersionMismatch(String interfaceName, long clientVersion,
+                           long serverVersion) {
+      super("Protocol " + interfaceName + " version mismatch. (client = " +
+            clientVersion + ", server = " + serverVersion + ")");
+      this.interfaceName = interfaceName;
+      this.clientVersion = clientVersion;
+      this.serverVersion = serverVersion;
+    }
+    
+    /**
+     * Get the interface name
+     * @return the java class name 
+     *          (eg. org.apache.hadoop.mapred.InterTrackerProtocol)
+     */
+    public String getInterfaceName() {
+      return interfaceName;
+    }
+    
+    /**
+     * Get the client's prefered version
+     */
+    public long getClientVersion() {
+      return clientVersion;
+    }
+    
+    /**
+     * Get the server's agreed to version.
+     */
+    public long getServerVersion() {
+      return serverVersion;
+    }
+  }
+  
+  public static VersionedProtocol waitForProxy(Class protocol,
+                                               long clientVersion,
+                                               InetSocketAddress addr,
+                                               Configuration conf
+                                               ) throws IOException {
+    while (true) {
+      try {
+        return getProxy(protocol, clientVersion, addr, conf);
+      } catch(ConnectException se) {  // namenode has not been started
+        LOG.info("Server at " + addr + " not available yet, Zzzzz...");
+      } catch(SocketTimeoutException te) {  // namenode is busy
+        LOG.info("Problem connecting to server: " + addr);
+      }
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException ie) {
+        // IGNORE
+      }
+    }
+  }
+  /** Construct a client-side proxy object that implements the named protocol,
+   * talking to a server at the named address. */
+  public static VersionedProtocol getProxy(Class<?> protocol,
+      long clientVersion, InetSocketAddress addr, Configuration conf,
+      SocketFactory factory) throws IOException {
+    return getProxy(protocol, clientVersion, addr, null, conf, factory);
+  }
+  
+  /** Construct a client-side proxy object that implements the named protocol,
+   * talking to a server at the named address. */
+  public static VersionedProtocol getProxy(Class<?> protocol,
+      long clientVersion, InetSocketAddress addr, UserGroupInformation ticket,
+      Configuration conf, SocketFactory factory) throws IOException {    
+
+    VersionedProtocol proxy =
+        (VersionedProtocol) Proxy.newProxyInstance(
+            protocol.getClassLoader(), new Class[] { protocol },
+            new Invoker(addr, ticket, conf, factory));
+    long serverVersion = proxy.getProtocolVersion(protocol.getName(), 
+                                                  clientVersion);
+    if (serverVersion == clientVersion) {
+      return proxy;
+    } else {
+      throw new VersionMismatch(protocol.getName(), clientVersion, 
+                                serverVersion);
+    }
+  }
+
+  /**
+   * Construct a client-side proxy object with the default SocketFactory
+   * 
+   * @param protocol
+   * @param clientVersion
+   * @param addr
+   * @param conf
+   * @return a proxy instance
+   * @throws IOException
+   */
+  public static VersionedProtocol getProxy(Class<?> protocol,
+      long clientVersion, InetSocketAddress addr, Configuration conf)
+      throws IOException {
+
+    return getProxy(protocol, clientVersion, addr, conf, NetUtils
+        .getDefaultSocketFactory(conf));
+  }
+
+  /** Expert: Make multiple, parallel calls to a set of servers. */
+  public static Object[] call(Method method, Object[][] params,
+                              InetSocketAddress[] addrs, Configuration conf)
+    throws IOException {
+
+    Invocation[] invocations = new Invocation[params.length];
+    for (int i = 0; i < params.length; i++)
+      invocations[i] = new Invocation(method, params[i]);
+    Writable[] wrappedValues = getClient(conf).call(invocations, addrs);
+    
+    if (method.getReturnType() == Void.TYPE) {
+      return null;
+    }
+
+    Object[] values =
+      (Object[])Array.newInstance(method.getReturnType(), wrappedValues.length);
+    for (int i = 0; i < values.length; i++)
+      if (wrappedValues[i] != null)
+        values[i] = ((HbaseObjectWritable)wrappedValues[i]).get();
+    
+    return values;
+  }
+
+  /** Construct a server for a protocol implementation instance listening on a
+   * port and address. */
+  public static Server getServer(final Object instance, final String bindAddress, final int port, Configuration conf) 
+    throws IOException {
+    return getServer(instance, bindAddress, port, 1, false, conf);
+  }
+
+  /** Construct a server for a protocol implementation instance listening on a
+   * port and address. */
+  public static Server getServer(final Object instance, final String bindAddress, final int port,
+                                 final int numHandlers,
+                                 final boolean verbose, Configuration conf) 
+    throws IOException {
+    return new Server(instance, conf, bindAddress, port, numHandlers, verbose);
+  }
+
+  /** An RPC Server. */
+  public static class Server extends org.apache.hadoop.ipc.Server {
+    private Object instance;
+    private Class<?> implementation;
+    private boolean verbose;
+
+    /** Construct an RPC server.
+     * @param instance the instance whose methods will be called
+     * @param conf the configuration to use
+     * @param bindAddress the address to bind on to listen for connection
+     * @param port the port to listen for connections on
+     */
+    public Server(Object instance, Configuration conf, String bindAddress, int port) 
+      throws IOException {
+      this(instance, conf,  bindAddress, port, 1, false);
+    }
+
+    /** Construct an RPC server.
+     * @param instance the instance whose methods will be called
+     * @param conf the configuration to use
+     * @param bindAddress the address to bind on to listen for connection
+     * @param port the port to listen for connections on
+     * @param numHandlers the number of method handler threads to run
+     * @param verbose whether each call should be logged
+     */
+    public Server(Object instance, Configuration conf, String bindAddress,  int port,
+                  int numHandlers, boolean verbose) throws IOException {
+      super(bindAddress, port, Invocation.class, numHandlers, conf);
+      this.instance = instance;
+      this.implementation = instance.getClass();
+      this.verbose = verbose;
+    }
+
+    public Writable call(Writable param) throws IOException {
+      try {
+        Invocation call = (Invocation)param;
+        if (verbose) log("Call: " + call);
+        
+        Method method =
+          implementation.getMethod(call.getMethodName(),
+                                   call.getParameterClasses());
+
+        long startTime = System.currentTimeMillis();
+        Object value = method.invoke(instance, call.getParameters());
+        long callTime = System.currentTimeMillis() - startTime;
+        LOG.debug("Served: " + call.getMethodName() + " " + callTime);
+        if (verbose) log("Return: "+value);
+
+        return new HbaseObjectWritable(method.getReturnType(), value);
+
+      } catch (InvocationTargetException e) {
+        Throwable target = e.getTargetException();
+        if (target instanceof IOException) {
+          throw (IOException)target;
+        } else {
+          IOException ioe = new IOException(target.toString());
+          ioe.setStackTrace(target.getStackTrace());
+          throw ioe;
+        }
+      } catch (Throwable e) {
+        IOException ioe = new IOException(e.toString());
+        ioe.setStackTrace(e.getStackTrace());
+        throw ioe;
+      }
+    }
+  }
+
+  private static void log(String value) {
+    if (value!= null && value.length() > 55)
+      value = value.substring(0, 55)+"...";
+    LOG.info(value);
+  }
+}

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/io/TestTextSequence.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/io/TestTextSequence.java?rev=607602&r1=607601&r2=607602&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/io/TestTextSequence.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/io/TestTextSequence.java Sun Dec 30 14:22:16 2007
@@ -19,11 +19,6 @@
  */
 package org.apache.hadoop.hbase.io;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-
 import org.apache.hadoop.hbase.HBaseTestCase;
 import org.apache.hadoop.io.Text;
 
@@ -54,19 +49,5 @@
     final TextSequence ts = new TextSequence(column, 0, family.getLength());
     assertTrue(ts.compareTo(family) == 0);
     assertTrue(ts.equals(family));
-  }
-  
-  public void testSerialize() throws Exception {
-    final Text t = new Text(getName());
-    final TextSequence ts = new TextSequence(t, 1, 3);
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    DataOutputStream dao = new DataOutputStream(baos);
-    ts.write(dao);
-    dao.close();
-    ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
-    DataInputStream dis = new DataInputStream(bais);
-    TextSequence deserializeTs = new TextSequence();
-    deserializeTs.readFields(dis);
-    assertTrue(ts.equals(deserializeTs));
   }
 }