You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by to...@apache.org on 2012/03/23 18:14:37 UTC

svn commit: r1304536 - in /incubator/hcatalog/trunk: ./ storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/ storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/ storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/ storage-ha...

Author: toffer
Date: Fri Mar 23 18:14:36 2012
New Revision: 1304536

URL: http://svn.apache.org/viewvc?rev=1304536&view=rev
Log:
HCATALOG-310 Turn current RM implementation into HBase Coprocessor (thw via toffer)

Added:
    incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.java
    incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpointClient.java
    incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerProtocol.java
    incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/package-info.java
    incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManagerEndpoint.java
Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java
    incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java
    incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java
    incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java
    incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java
    incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/SkeletonHBaseTest.java

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1304536&r1=1304535&r2=1304536&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Fri Mar 23 18:14:36 2012
@@ -62,6 +62,8 @@ Release 0.4.0 - Unreleased
   HCAT-240. Changes to HCatOutputFormat to make it use SerDes instead of StorageDriver (toffer)
 
   NEW FEATURES
+  HCAT-310 Turn current RM implementation into HBase Coprocessor (thw via toffer)
+
   HCAT-334 HCatalog should generate a POM file so it can be deployed to a maven repo (traviscrawford via gates)
 
   HCAT-296 Hcatalog should be able talk to secure hbase server using hbase delegation token (rohini via toffer)

Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java?rev=1304536&r1=1304535&r2=1304536&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java (original)
+++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java Fri Mar 23 18:14:36 2012
@@ -24,13 +24,11 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hive.hbase.HBaseSerDe;
 import org.apache.hcatalog.common.HCatConstants;
 import org.apache.hcatalog.common.HCatUtil;
@@ -40,7 +38,6 @@ import org.apache.hcatalog.hbase.snapsho
 import org.apache.hcatalog.hbase.snapshot.RevisionManagerFactory;
 import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
 import org.apache.hcatalog.hbase.snapshot.Transaction;
-import org.apache.hcatalog.hbase.snapshot.ZKBasedRevisionManager;
 import org.apache.hcatalog.mapreduce.HCatTableInfo;
 import org.apache.hcatalog.mapreduce.InputJobInfo;
 import org.apache.hcatalog.mapreduce.OutputJobInfo;
@@ -126,37 +123,7 @@ class HBaseRevisionManagerUtil {
      * @throws IOException
      */
     static RevisionManager getOpenedRevisionManager(Configuration jobConf) throws IOException {
-
-        Properties properties = new Properties();
-        String zkHostList = jobConf.get(HConstants.ZOOKEEPER_QUORUM);
-        int port = jobConf.getInt("hbase.zookeeper.property.clientPort",
-                HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
-
-        if (zkHostList != null) {
-            String[] splits = zkHostList.split(",");
-            StringBuffer sb = new StringBuffer();
-            for (String split : splits) {
-                sb.append(split);
-                sb.append(':');
-                sb.append(port);
-                sb.append(',');
-            }
-
-            sb.deleteCharAt(sb.length() - 1);
-            properties.put(ZKBasedRevisionManager.HOSTLIST, sb.toString());
-        }
-        String dataDir = jobConf.get(ZKBasedRevisionManager.DATADIR);
-        if (dataDir != null) {
-            properties.put(ZKBasedRevisionManager.DATADIR, dataDir);
-        }
-        String rmClassName = jobConf.get(
-                RevisionManager.REVISION_MGR_IMPL_CLASS,
-                ZKBasedRevisionManager.class.getName());
-        properties.put(RevisionManager.REVISION_MGR_IMPL_CLASS, rmClassName);
-        RevisionManager revisionManger = RevisionManagerFactory
-                .getRevisionManager(properties);
-        revisionManger.open();
-        return revisionManger;
+      return RevisionManagerFactory.getOpenedRevisionManager(jobConf);
     }
 
     static void closeRevisionManagerQuietly(RevisionManager rm) {

Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java?rev=1304536&r1=1304535&r2=1304536&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java (original)
+++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java Fri Mar 23 18:14:36 2012
@@ -25,8 +25,11 @@ import java.util.Properties;
  * This interface provides APIs for implementing revision management.
  */
 public interface RevisionManager {
-
-    public static final String REVISION_MGR_IMPL_CLASS = "revision.manager.impl.class";
+    /**
+     * Version property required by HBase to use this interface
+     * for CoprocessorProtocol / RPC.
+     */
+    public static final long VERSION = 1L; // do not change
 
     /**
      * Initialize the revision manager.

Added: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.java?rev=1304536&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.java (added)
+++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.java Fri Mar 23 18:14:36 2012
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of RevisionManager as HBase RPC endpoint. This class will control the lifecycle of
+ * and delegate to the actual RevisionManager implementation and make it available as a service
+ * hosted in the HBase region server (instead of running it in the client (storage handler).
+ * In the case of {@link ZKBasedRevisionManager} now only the region servers need write access to
+ * manage revision data.
+ */
+public class RevisionManagerEndpoint extends BaseEndpointCoprocessor implements RevisionManagerProtocol {
+
+  private static final Logger LOGGER =
+		      LoggerFactory.getLogger(RevisionManagerEndpoint.class.getName());
+  public static final String REVISION_MGR_ENDPOINT_IMPL_CLASS = "revision.manager.endpoint.impl.class";
+  
+  private RevisionManager rmImpl = null;
+
+  @Override
+  public void start(CoprocessorEnvironment env) {
+    super.start(env);
+    try {
+      Configuration conf = env.getConfiguration();
+      String className = conf.get(REVISION_MGR_ENDPOINT_IMPL_CLASS,
+          ZKBasedRevisionManager.class.getName());
+      rmImpl = RevisionManagerFactory.getOpenedRevisionManager(className, conf);
+    } catch (IOException e) {
+      LOGGER.error("Failed to initialize revision manager", e);
+    }
+  }
+
+  @Override
+  public void stop(CoprocessorEnvironment env) {
+    if (rmImpl != null) {
+      try {
+        rmImpl.close();
+      } catch (IOException e) {
+        LOGGER.warn("Error closing revision manager.", e);
+      }
+    }
+    super.stop(env);
+  }
+
+  @Override
+  public void initialize(Properties properties) {
+    // do nothing, HBase controls life cycle
+  }
+
+  @Override
+  public void open() throws IOException {
+    // do nothing, HBase controls life cycle
+  }
+
+  @Override
+  public void close() throws IOException {
+    // do nothing, HBase controls life cycle
+  }
+
+  @Override
+  public Transaction beginWriteTransaction(String table, List<String> families)
+      throws IOException {
+    return rmImpl.beginWriteTransaction(table, families);
+  }
+
+  @Override
+  public Transaction beginWriteTransaction(String table,
+      List<String> families, long keepAlive) throws IOException {
+    return rmImpl.beginWriteTransaction(table, families, keepAlive);
+  }
+
+  @Override
+  public void commitWriteTransaction(Transaction transaction)
+      throws IOException {
+    rmImpl.commitWriteTransaction(transaction);
+  }
+
+  @Override
+  public void abortWriteTransaction(Transaction transaction)
+      throws IOException {
+    rmImpl.abortWriteTransaction(transaction);
+  }
+
+  @Override
+  public TableSnapshot createSnapshot(String tableName) throws IOException {
+    return rmImpl.createSnapshot(tableName);
+  }
+
+  @Override
+  public TableSnapshot createSnapshot(String tableName, long revision)
+      throws IOException {
+    return rmImpl.createSnapshot(tableName, revision);
+  }
+
+  @Override
+  public void keepAlive(Transaction transaction) throws IOException {
+    rmImpl.keepAlive(transaction);
+  }
+
+  @Override
+  public List<FamilyRevision> getAbortedWriteTransactions(String table,
+      String columnFamily) throws IOException {
+    return rmImpl.getAbortedWriteTransactions(table, columnFamily);
+  }
+
+}

Added: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpointClient.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpointClient.java?rev=1304536&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpointClient.java (added)
+++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpointClient.java Fri Mar 23 18:14:36 2012
@@ -0,0 +1,97 @@
+package org.apache.hcatalog.hbase.snapshot;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * This class is nothing but a delegate for the enclosed proxy,
+ * which is created upon setting the configuration.
+ */
+public class RevisionManagerEndpointClient implements RevisionManager, Configurable {
+
+  private Configuration conf = null;
+  private RevisionManager rmProxy;
+
+  @Override
+  public Configuration getConf() {
+    return this.conf;
+  }
+
+  @Override
+  public void setConf(Configuration arg0) {
+    this.conf = arg0;
+  }
+
+  @Override
+  public void initialize(Properties properties) {
+    // do nothing
+  }
+
+  @Override
+  public void open() throws IOException {
+    // clone to adjust RPC settings unique to proxy
+    Configuration clonedConf = new Configuration(conf);
+    // conf.set("hbase.ipc.client.connect.max.retries", "0");
+    // conf.setInt(HConstants.HBASE_CLIENT_RPC_MAXATTEMPTS, 1);
+    clonedConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); // do not retry RPC
+    HTable table = new HTable(clonedConf, HConstants.META_TABLE_NAME);
+    rmProxy = table.coprocessorProxy(RevisionManagerProtocol.class,
+        Bytes.toBytes("anyRow"));
+    rmProxy.open();
+  }
+
+  @Override
+  public void close() throws IOException {
+    rmProxy.close();
+  }
+
+  @Override
+  public Transaction beginWriteTransaction(String table, List<String> families) throws IOException {
+    return rmProxy.beginWriteTransaction(table, families);
+  }
+
+  @Override
+  public Transaction beginWriteTransaction(String table, List<String> families, long keepAlive)
+      throws IOException {
+    return rmProxy.beginWriteTransaction(table, families, keepAlive);
+  }
+
+  @Override
+  public void commitWriteTransaction(Transaction transaction) throws IOException {
+    rmProxy.commitWriteTransaction(transaction);
+  }
+
+  @Override
+  public void abortWriteTransaction(Transaction transaction) throws IOException {
+    rmProxy.abortWriteTransaction(transaction);
+  }
+
+  @Override
+  public List<FamilyRevision> getAbortedWriteTransactions(String table, String columnFamily)
+      throws IOException {
+    return rmProxy.getAbortedWriteTransactions(table, columnFamily);
+  }
+
+  @Override
+  public TableSnapshot createSnapshot(String tableName) throws IOException {
+    return rmProxy.createSnapshot(tableName);
+  }
+
+  @Override
+  public TableSnapshot createSnapshot(String tableName, long revision) throws IOException {
+    return rmProxy.createSnapshot(tableName, revision);
+  }
+
+  @Override
+  public void keepAlive(Transaction transaction) throws IOException {
+    rmProxy.keepAlive(transaction);
+  }
+
+}

Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java?rev=1304536&r1=1304535&r2=1304536&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java (original)
+++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java Fri Mar 23 18:14:36 2012
@@ -20,16 +20,26 @@ package org.apache.hcatalog.hbase.snapsh
 import java.io.IOException;
 import java.util.Properties;
 
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+
+/**
+ * Utility to instantiate the revision manager (not a true factory actually).
+ * Depends on HBase configuration to resolve ZooKeeper connection (when ZK is used).
+ */
 public class RevisionManagerFactory {
 
-   /**
-    * Gets an instance of revision manager.
-    *
-    * @param properties The properties required to created the revision manager.
-    * @return the revision manager An instance of revision manager.
-    * @throws IOException Signals that an I/O exception has occurred.
-    */
-   public static RevisionManager getRevisionManager(Properties properties) throws IOException{
+  public static final String REVISION_MGR_IMPL_CLASS = "revision.manager.impl.class";
+
+  /**
+   * Gets an instance of revision manager.
+   *
+   * @param properties The properties required to created the revision manager.
+   * @return the revision manager An instance of revision manager.
+   * @throws IOException Signals that an I/O exception has occurred.
+   */
+   private static RevisionManager getRevisionManager(String className, Properties properties) throws IOException{
 
         RevisionManager revisionMgr;
         ClassLoader classLoader = Thread.currentThread()
@@ -37,14 +47,9 @@ public class RevisionManagerFactory {
         if (classLoader == null) {
             classLoader = RevisionManagerFactory.class.getClassLoader();
         }
-        String className = properties.getProperty(
-                RevisionManager.REVISION_MGR_IMPL_CLASS,
-                ZKBasedRevisionManager.class.getName());
         try {
-
-            @SuppressWarnings("unchecked")
-            Class<? extends RevisionManager> revisionMgrClass = (Class<? extends RevisionManager>) Class
-                    .forName(className, true , classLoader);
+            Class<? extends RevisionManager> revisionMgrClass = Class
+                    .forName(className, true , classLoader).asSubclass(RevisionManager.class);
             revisionMgr = (RevisionManager) revisionMgrClass.newInstance();
             revisionMgr.initialize(properties);
         } catch (ClassNotFoundException e) {
@@ -67,4 +72,58 @@ public class RevisionManagerFactory {
         return revisionMgr;
     }
 
+   /**
+    * Internally used by endpoint implementation to instantiate from different configuration setting.
+    * @param className
+    * @param conf
+    * @return
+    * @throws IOException
+    */
+   static RevisionManager getOpenedRevisionManager(String className, Configuration conf) throws IOException {
+
+       Properties properties = new Properties();
+       String zkHostList = conf.get(HConstants.ZOOKEEPER_QUORUM);
+       int port = conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT,
+               HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
+
+       if (zkHostList != null) {
+           String[] splits = zkHostList.split(",");
+           StringBuffer sb = new StringBuffer();
+           for (String split : splits) {
+               sb.append(split);
+               sb.append(':');
+               sb.append(port);
+               sb.append(',');
+           }
+
+           sb.deleteCharAt(sb.length() - 1);
+           properties.put(ZKBasedRevisionManager.HOSTLIST, sb.toString());
+       }
+       String dataDir = conf.get(ZKBasedRevisionManager.DATADIR);
+       if (dataDir != null) {
+           properties.put(ZKBasedRevisionManager.DATADIR, dataDir);
+       }
+       RevisionManager revisionMgr = RevisionManagerFactory
+               .getRevisionManager(className, properties);
+       if (revisionMgr instanceof Configurable) {
+         ((Configurable)revisionMgr).setConf(conf);
+       }
+       revisionMgr.open();
+       return revisionMgr;
+   }
+
+   /**
+    * Gets an instance of revision manager which is opened.
+    * The revision manager implementation can be specified as {@link #REVISION_MGR_IMPL_CLASS},
+    * default is {@link ZKBasedRevisionManager}.
+    * @param hbaseConf The HBase configuration.
+    * @return RevisionManager An instance of revision manager.
+    * @throws IOException
+    */
+   public static RevisionManager getOpenedRevisionManager(Configuration hbaseConf) throws IOException {
+     String className = hbaseConf.get(RevisionManagerFactory.REVISION_MGR_IMPL_CLASS,
+         ZKBasedRevisionManager.class.getName());
+     return getOpenedRevisionManager(className, hbaseConf);
+   }
+
 }

Added: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerProtocol.java?rev=1304536&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerProtocol.java (added)
+++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerProtocol.java Fri Mar 23 18:14:36 2012
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+
+/**
+ * Interface marker to implement RevisionManager as Coprocessor.
+ * (needs to extend CoprocessorProtocol)
+ */
+public interface RevisionManagerProtocol extends RevisionManager,
+    CoprocessorProtocol {
+
+}

Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java?rev=1304536&r1=1304535&r2=1304536&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java (original)
+++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java Fri Mar 23 18:14:36 2012
@@ -17,6 +17,7 @@
  */
 package org.apache.hcatalog.hbase.snapshot;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -24,7 +25,7 @@ import java.util.Map;
 /**
  * The snapshot for a table and a list of column families.
  */
-public class TableSnapshot {
+public class TableSnapshot implements Serializable {
 
     private String name;
 
@@ -35,6 +36,9 @@ public class TableSnapshot {
 
     public TableSnapshot(String name, Map<String, Long> cfRevMap, long latestRevision) {
         this.name = name;
+        if (cfRevMap == null) {
+          throw new IllegalArgumentException("revision map cannot be null");
+        }
         this.cfRevisionMap = cfRevMap;
         this.latestRevision = latestRevision;
     }

Added: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/package-info.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/package-info.java?rev=1304536&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/package-info.java (added)
+++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/package-info.java Fri Mar 23 18:14:36 2012
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Provides a revision manager for data stored in HBase that can be used to implement repeatable reads.
+ * The component is designed to be usable for revision management of data stored in HBase in general,
+ * independent and not limited to HCatalog. It is used by the HCatalog HBase storage handler, implementation depends on HBase 0.92+.
+ * <p>
+ * For more information please see 
+ * <a href="https://cwiki.apache.org/confluence/display/HCATALOG/Snapshots+and+Repeatable+reads+for+HBase+Tables">Snapshots and Repeatable reads for HBase Tables</a>.
+ * @since 0.4
+ */
+package org.apache.hcatalog.hbase.snapshot;

Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java?rev=1304536&r1=1304535&r2=1304536&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java (original)
+++ incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java Fri Mar 23 18:14:36 2012
@@ -254,8 +254,8 @@ public class ManyMiniCluster {
 
             hbaseConf.set("hbase.rootdir", hbaseRoot);
             hbaseConf.set("hbase.master", "local");
-            hbaseConf.setInt("hbase.zookeeper.property.clientPort", zookeeperPort);
-            hbaseConf.set("hbase.zookeeper.quorum", "127.0.0.1");
+            hbaseConf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zookeeperPort);
+            hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "127.0.0.1");
             hbaseConf.setInt("hbase.master.port", findFreePort());
             hbaseConf.setInt("hbase.master.info.port", -1);
             hbaseConf.setInt("hbase.regionserver.port", findFreePort());
@@ -306,7 +306,7 @@ public class ManyMiniCluster {
         private File workDir;
         private int numTaskTrackers = 1;
         private JobConf jobConf;
-        private HBaseConfiguration hbaseConf;
+        private Configuration hbaseConf;
         private HiveConf hiveConf;
 
         private boolean miniMRClusterEnabled = true;
@@ -329,7 +329,7 @@ public class ManyMiniCluster {
             return this;
         }
 
-        public Builder hbaseConf(HBaseConfiguration hbaseConf) {
+        public Builder hbaseConf(Configuration hbaseConf) {
             this.hbaseConf = hbaseConf;
             return this;
         }

Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/SkeletonHBaseTest.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/SkeletonHBaseTest.java?rev=1304536&r1=1304535&r2=1304536&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/SkeletonHBaseTest.java (original)
+++ incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/SkeletonHBaseTest.java Fri Mar 23 18:14:36 2012
@@ -18,9 +18,18 @@
 
 package org.apache.hcatalog.hbase;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -28,14 +37,6 @@ import org.apache.hadoop.hive.conf.HiveC
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-
 /**
  * Base class for HBase Tests which need a mini cluster instance
  */
@@ -48,6 +49,12 @@ public abstract class SkeletonHBaseTest 
     protected static Map<String,Context> contextMap = new HashMap<String,Context>();
     protected static Set<String> tableNames = new HashSet<String>();
 
+    /**
+     * Allow tests to alter the default MiniCluster configuration.
+     * (requires static initializer block as all setup here is static)
+     */
+    protected static Configuration testConf = null;
+
     protected void createTable(String tableName, String[] families) {
         try {
             HBaseAdmin admin = new HBaseAdmin(getHbaseConf());
@@ -76,6 +83,7 @@ public abstract class SkeletonHBaseTest 
         return name;
     }
 
+
     /**
      * startup an hbase cluster instance before a test suite runs
      */
@@ -173,9 +181,13 @@ public abstract class SkeletonHBaseTest 
 
         public void start() {
             if(usageCount++ == 0) {
-                cluster = ManyMiniCluster.create(new File(testDir)).build();
+            	ManyMiniCluster.Builder b = ManyMiniCluster.create(new File(testDir));
+                if (testConf != null) {
+                   b.hbaseConf(HBaseConfiguration.create(testConf));
+                }
+                cluster = b.build();
                 cluster.start();
-                hbaseConf = cluster.getHBaseConf();
+                this.hbaseConf = cluster.getHBaseConf();
                 jobConf = cluster.getJobConf();
                 fileSystem = cluster.getFileSystem();
                 hiveConf = cluster.getHiveConf();

Added: incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManagerEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManagerEndpoint.java?rev=1304536&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManagerEndpoint.java (added)
+++ incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManagerEndpoint.java Fri Mar 23 18:14:36 2012
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hcatalog.hbase.SkeletonHBaseTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestRevisionManagerEndpoint extends SkeletonHBaseTest {
+
+  static {
+    // test case specific mini cluster settings
+    testConf = new Configuration(false);
+    testConf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+        "org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpoint",
+        "org.apache.hadoop.hbase.coprocessor.GenericEndpoint");
+    testConf.set(RevisionManagerEndpoint.REVISION_MGR_ENDPOINT_IMPL_CLASS, MockRM.class.getName());
+  }
+
+  /**
+   * Mock implementation to test the protocol/serialization
+   */
+  public static class MockRM implements RevisionManager {
+
+    private static class Invocation {
+      Invocation(String methodName, Object ret, Object... args) {
+          this.methodName = methodName;
+          this.args = args;
+          this.ret = ret;
+      }
+
+      String methodName;
+      Object[] args;
+      Object ret;
+
+      private static boolean equals(Object obj1, Object obj2) {
+        if (obj1 == obj2) return true;
+        if (obj1 == null || obj2 == null) return false;
+        if (obj1 instanceof Transaction || obj1 instanceof TableSnapshot) {
+          return obj1.toString().equals(obj2.toString());
+        }
+        return obj1.equals(obj2);
+      }
+
+      @Override
+      public boolean equals(Object obj) {
+        Invocation other = (Invocation)obj;
+        if (this == other) return true;
+        if (other == null) return false;
+        if (this.args != other.args) {
+          if (this.args == null || other.args == null) return false;
+          if (this.args.length != other.args.length) return false;
+          for (int i=0; i<args.length; i++) {
+            if (!equals(this.args[i], other.args[i])) return false;
+          }
+        }
+        return equals(this.ret, other.ret);
+      }
+
+      @Override
+      public String toString() {
+        return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).
+            append("method", this.methodName).
+            append("args", this.args).
+            append("returns", this.ret).
+            toString();
+      }
+    }
+
+    final static String DEFAULT_INSTANCE = "default";
+    final static Map<String, MockRM> INSTANCES = new ConcurrentHashMap<String, MockRM>();
+    Invocation lastCall;
+    boolean isOpen = false;
+
+    private <T extends Object> T recordCall(T result, Object...args) {
+      StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
+      lastCall = new Invocation(stackTrace[2].getMethodName(), result, args);
+      return result;
+    }
+
+    @Override
+    public void initialize(Properties properties) {
+      INSTANCES.put("default", this);
+    }
+
+    @Override
+    public void open() throws IOException {
+      isOpen = true;
+    }
+
+    @Override
+    public void close() throws IOException {
+      isOpen = false;
+    }
+
+    @Override
+    public Transaction beginWriteTransaction(String table,
+        List<String> families) throws IOException {
+      return recordCall(null, table, families);
+    }
+
+    @Override
+    public Transaction beginWriteTransaction(String table,
+      List<String> families, long keepAlive) throws IOException {
+      return recordCall(null, table, families, keepAlive);
+    }
+
+    @Override
+    public void commitWriteTransaction(Transaction transaction)
+        throws IOException {
+    }
+
+    @Override
+    public void abortWriteTransaction(Transaction transaction)
+        throws IOException {
+    }
+
+    @Override
+    public List<FamilyRevision> getAbortedWriteTransactions(String table,
+        String columnFamily) throws IOException {
+      return null;
+    }
+
+    @Override
+    public TableSnapshot createSnapshot(String tableName)
+        throws IOException {
+      return null;
+    }
+
+    @Override
+    public TableSnapshot createSnapshot(String tableName, long revision)
+        throws IOException {
+      TableSnapshot ret = new TableSnapshot(tableName, new HashMap<String, Long>(), revision);
+      return recordCall(ret, tableName, revision);
+    }
+
+    @Override
+    public void keepAlive(Transaction transaction) throws IOException {
+      recordCall(null, transaction);
+    }
+  }
+
+  @Test
+  public void testRevisionManagerProtocol() throws Throwable {
+
+    Configuration conf = getHbaseConf();
+    RevisionManager rm = RevisionManagerFactory.getOpenedRevisionManager(
+        RevisionManagerEndpointClient.class.getName(), conf);
+
+    MockRM mockImpl = MockRM.INSTANCES.get(MockRM.DEFAULT_INSTANCE);
+    Assert.assertNotNull(mockImpl);
+    Assert.assertTrue(mockImpl.isOpen);
+
+    Transaction t = new Transaction("t1", Arrays.asList("f1", "f2"), 0, 0);
+    MockRM.Invocation call = new MockRM.Invocation("keepAlive", null, t);
+    rm.keepAlive(t);
+    Assert.assertEquals(call.methodName, call, mockImpl.lastCall);
+
+    t = new Transaction("t2", Arrays.asList("f21", "f22"), 0, 0);
+    call = new MockRM.Invocation("beginWriteTransaction", null, t.getTableName(),  t.getColumnFamilies());
+    call.ret = rm.beginWriteTransaction(t.getTableName(), t.getColumnFamilies());
+    Assert.assertEquals(call.methodName, call, mockImpl.lastCall);
+
+    call = new MockRM.Invocation("createSnapshot", null, "t3", 1L);
+    call.ret = rm.createSnapshot("t3", 1);
+    Assert.assertEquals(call.methodName, call, mockImpl.lastCall);
+
+  }
+
+}