You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hcatalog-commits@incubator.apache.org by to...@apache.org on 2012/03/23 18:14:37 UTC
svn commit: r1304536 - in /incubator/hcatalog/trunk: ./
storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/
storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/
storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/ storage-ha...
Author: toffer
Date: Fri Mar 23 18:14:36 2012
New Revision: 1304536
URL: http://svn.apache.org/viewvc?rev=1304536&view=rev
Log:
HCATALOG-310 Turn current RM implementation into HBase Coprocessor (thw via toffer)
Added:
incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.java
incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpointClient.java
incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerProtocol.java
incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/package-info.java
incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManagerEndpoint.java
Modified:
incubator/hcatalog/trunk/CHANGES.txt
incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java
incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java
incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java
incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java
incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java
incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/SkeletonHBaseTest.java
Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1304536&r1=1304535&r2=1304536&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Fri Mar 23 18:14:36 2012
@@ -62,6 +62,8 @@ Release 0.4.0 - Unreleased
HCAT-240. Changes to HCatOutputFormat to make it use SerDes instead of StorageDriver (toffer)
NEW FEATURES
+ HCAT-310 Turn current RM implementation into HBase Coprocessor (thw via toffer)
+
HCAT-334 HCatalog should generate a POM file so it can be deployed to a maven repo (traviscrawford via gates)
HCAT-296 Hcatalog should be able talk to secure hbase server using hbase delegation token (rohini via toffer)
Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java?rev=1304536&r1=1304535&r2=1304536&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java (original)
+++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseRevisionManagerUtil.java Fri Mar 23 18:14:36 2012
@@ -24,13 +24,11 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hive.hbase.HBaseSerDe;
import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.common.HCatUtil;
@@ -40,7 +38,6 @@ import org.apache.hcatalog.hbase.snapsho
import org.apache.hcatalog.hbase.snapshot.RevisionManagerFactory;
import org.apache.hcatalog.hbase.snapshot.TableSnapshot;
import org.apache.hcatalog.hbase.snapshot.Transaction;
-import org.apache.hcatalog.hbase.snapshot.ZKBasedRevisionManager;
import org.apache.hcatalog.mapreduce.HCatTableInfo;
import org.apache.hcatalog.mapreduce.InputJobInfo;
import org.apache.hcatalog.mapreduce.OutputJobInfo;
@@ -126,37 +123,7 @@ class HBaseRevisionManagerUtil {
* @throws IOException
*/
static RevisionManager getOpenedRevisionManager(Configuration jobConf) throws IOException {
-
- Properties properties = new Properties();
- String zkHostList = jobConf.get(HConstants.ZOOKEEPER_QUORUM);
- int port = jobConf.getInt("hbase.zookeeper.property.clientPort",
- HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
-
- if (zkHostList != null) {
- String[] splits = zkHostList.split(",");
- StringBuffer sb = new StringBuffer();
- for (String split : splits) {
- sb.append(split);
- sb.append(':');
- sb.append(port);
- sb.append(',');
- }
-
- sb.deleteCharAt(sb.length() - 1);
- properties.put(ZKBasedRevisionManager.HOSTLIST, sb.toString());
- }
- String dataDir = jobConf.get(ZKBasedRevisionManager.DATADIR);
- if (dataDir != null) {
- properties.put(ZKBasedRevisionManager.DATADIR, dataDir);
- }
- String rmClassName = jobConf.get(
- RevisionManager.REVISION_MGR_IMPL_CLASS,
- ZKBasedRevisionManager.class.getName());
- properties.put(RevisionManager.REVISION_MGR_IMPL_CLASS, rmClassName);
- RevisionManager revisionManger = RevisionManagerFactory
- .getRevisionManager(properties);
- revisionManger.open();
- return revisionManger;
+ return RevisionManagerFactory.getOpenedRevisionManager(jobConf);
}
static void closeRevisionManagerQuietly(RevisionManager rm) {
Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java?rev=1304536&r1=1304535&r2=1304536&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java (original)
+++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManager.java Fri Mar 23 18:14:36 2012
@@ -25,8 +25,11 @@ import java.util.Properties;
* This interface provides APIs for implementing revision management.
*/
public interface RevisionManager {
-
- public static final String REVISION_MGR_IMPL_CLASS = "revision.manager.impl.class";
+ /**
+ * Version property required by HBase to use this interface
+ * for CoprocessorProtocol / RPC.
+ */
+ public static final long VERSION = 1L; // do not change
/**
* Initialize the revision manager.
Added: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.java?rev=1304536&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.java (added)
+++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpoint.java Fri Mar 23 18:14:36 2012
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of RevisionManager as HBase RPC endpoint. This class will control the lifecycle of
+ * and delegate to the actual RevisionManager implementation and make it available as a service
+ * hosted in the HBase region server (instead of running it in the client (storage handler).
+ * In the case of {@link ZKBasedRevisionManager} now only the region servers need write access to
+ * manage revision data.
+ */
+public class RevisionManagerEndpoint extends BaseEndpointCoprocessor implements RevisionManagerProtocol {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(RevisionManagerEndpoint.class.getName());
+ public static final String REVISION_MGR_ENDPOINT_IMPL_CLASS = "revision.manager.endpoint.impl.class";
+
+ private RevisionManager rmImpl = null;
+
+ @Override
+ public void start(CoprocessorEnvironment env) {
+ super.start(env);
+ try {
+ Configuration conf = env.getConfiguration();
+ String className = conf.get(REVISION_MGR_ENDPOINT_IMPL_CLASS,
+ ZKBasedRevisionManager.class.getName());
+ rmImpl = RevisionManagerFactory.getOpenedRevisionManager(className, conf);
+ } catch (IOException e) {
+ LOGGER.error("Failed to initialize revision manager", e);
+ }
+ }
+
+ @Override
+ public void stop(CoprocessorEnvironment env) {
+ if (rmImpl != null) {
+ try {
+ rmImpl.close();
+ } catch (IOException e) {
+ LOGGER.warn("Error closing revision manager.", e);
+ }
+ }
+ super.stop(env);
+ }
+
+ @Override
+ public void initialize(Properties properties) {
+ // do nothing, HBase controls life cycle
+ }
+
+ @Override
+ public void open() throws IOException {
+ // do nothing, HBase controls life cycle
+ }
+
+ @Override
+ public void close() throws IOException {
+ // do nothing, HBase controls life cycle
+ }
+
+ @Override
+ public Transaction beginWriteTransaction(String table, List<String> families)
+ throws IOException {
+ return rmImpl.beginWriteTransaction(table, families);
+ }
+
+ @Override
+ public Transaction beginWriteTransaction(String table,
+ List<String> families, long keepAlive) throws IOException {
+ return rmImpl.beginWriteTransaction(table, families, keepAlive);
+ }
+
+ @Override
+ public void commitWriteTransaction(Transaction transaction)
+ throws IOException {
+ rmImpl.commitWriteTransaction(transaction);
+ }
+
+ @Override
+ public void abortWriteTransaction(Transaction transaction)
+ throws IOException {
+ rmImpl.abortWriteTransaction(transaction);
+ }
+
+ @Override
+ public TableSnapshot createSnapshot(String tableName) throws IOException {
+ return rmImpl.createSnapshot(tableName);
+ }
+
+ @Override
+ public TableSnapshot createSnapshot(String tableName, long revision)
+ throws IOException {
+ return rmImpl.createSnapshot(tableName, revision);
+ }
+
+ @Override
+ public void keepAlive(Transaction transaction) throws IOException {
+ rmImpl.keepAlive(transaction);
+ }
+
+ @Override
+ public List<FamilyRevision> getAbortedWriteTransactions(String table,
+ String columnFamily) throws IOException {
+ return rmImpl.getAbortedWriteTransactions(table, columnFamily);
+ }
+
+}
Added: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpointClient.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpointClient.java?rev=1304536&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpointClient.java (added)
+++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerEndpointClient.java Fri Mar 23 18:14:36 2012
@@ -0,0 +1,97 @@
+package org.apache.hcatalog.hbase.snapshot;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * This class is nothing but a delegate for the enclosed proxy,
+ * which is created upon setting the configuration.
+ */
+public class RevisionManagerEndpointClient implements RevisionManager, Configurable {
+
+ private Configuration conf = null;
+ private RevisionManager rmProxy;
+
+ @Override
+ public Configuration getConf() {
+ return this.conf;
+ }
+
+ @Override
+ public void setConf(Configuration arg0) {
+ this.conf = arg0;
+ }
+
+ @Override
+ public void initialize(Properties properties) {
+ // do nothing
+ }
+
+ @Override
+ public void open() throws IOException {
+ // clone to adjust RPC settings unique to proxy
+ Configuration clonedConf = new Configuration(conf);
+ // conf.set("hbase.ipc.client.connect.max.retries", "0");
+ // conf.setInt(HConstants.HBASE_CLIENT_RPC_MAXATTEMPTS, 1);
+ clonedConf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); // do not retry RPC
+ HTable table = new HTable(clonedConf, HConstants.META_TABLE_NAME);
+ rmProxy = table.coprocessorProxy(RevisionManagerProtocol.class,
+ Bytes.toBytes("anyRow"));
+ rmProxy.open();
+ }
+
+ @Override
+ public void close() throws IOException {
+ rmProxy.close();
+ }
+
+ @Override
+ public Transaction beginWriteTransaction(String table, List<String> families) throws IOException {
+ return rmProxy.beginWriteTransaction(table, families);
+ }
+
+ @Override
+ public Transaction beginWriteTransaction(String table, List<String> families, long keepAlive)
+ throws IOException {
+ return rmProxy.beginWriteTransaction(table, families, keepAlive);
+ }
+
+ @Override
+ public void commitWriteTransaction(Transaction transaction) throws IOException {
+ rmProxy.commitWriteTransaction(transaction);
+ }
+
+ @Override
+ public void abortWriteTransaction(Transaction transaction) throws IOException {
+ rmProxy.abortWriteTransaction(transaction);
+ }
+
+ @Override
+ public List<FamilyRevision> getAbortedWriteTransactions(String table, String columnFamily)
+ throws IOException {
+ return rmProxy.getAbortedWriteTransactions(table, columnFamily);
+ }
+
+ @Override
+ public TableSnapshot createSnapshot(String tableName) throws IOException {
+ return rmProxy.createSnapshot(tableName);
+ }
+
+ @Override
+ public TableSnapshot createSnapshot(String tableName, long revision) throws IOException {
+ return rmProxy.createSnapshot(tableName, revision);
+ }
+
+ @Override
+ public void keepAlive(Transaction transaction) throws IOException {
+ rmProxy.keepAlive(transaction);
+ }
+
+}
Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java?rev=1304536&r1=1304535&r2=1304536&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java (original)
+++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerFactory.java Fri Mar 23 18:14:36 2012
@@ -20,16 +20,26 @@ package org.apache.hcatalog.hbase.snapsh
import java.io.IOException;
import java.util.Properties;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+
+/**
+ * Utility to instantiate the revision manager (not a true factory actually).
+ * Depends on HBase configuration to resolve ZooKeeper connection (when ZK is used).
+ */
public class RevisionManagerFactory {
- /**
- * Gets an instance of revision manager.
- *
- * @param properties The properties required to created the revision manager.
- * @return the revision manager An instance of revision manager.
- * @throws IOException Signals that an I/O exception has occurred.
- */
- public static RevisionManager getRevisionManager(Properties properties) throws IOException{
+ public static final String REVISION_MGR_IMPL_CLASS = "revision.manager.impl.class";
+
+ /**
+ * Gets an instance of revision manager.
+ *
+ * @param properties The properties required to created the revision manager.
+ * @return the revision manager An instance of revision manager.
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
+ private static RevisionManager getRevisionManager(String className, Properties properties) throws IOException{
RevisionManager revisionMgr;
ClassLoader classLoader = Thread.currentThread()
@@ -37,14 +47,9 @@ public class RevisionManagerFactory {
if (classLoader == null) {
classLoader = RevisionManagerFactory.class.getClassLoader();
}
- String className = properties.getProperty(
- RevisionManager.REVISION_MGR_IMPL_CLASS,
- ZKBasedRevisionManager.class.getName());
try {
-
- @SuppressWarnings("unchecked")
- Class<? extends RevisionManager> revisionMgrClass = (Class<? extends RevisionManager>) Class
- .forName(className, true , classLoader);
+ Class<? extends RevisionManager> revisionMgrClass = Class
+ .forName(className, true , classLoader).asSubclass(RevisionManager.class);
revisionMgr = (RevisionManager) revisionMgrClass.newInstance();
revisionMgr.initialize(properties);
} catch (ClassNotFoundException e) {
@@ -67,4 +72,58 @@ public class RevisionManagerFactory {
return revisionMgr;
}
+ /**
+ * Internally used by endpoint implementation to instantiate from different configuration setting.
+ * @param className
+ * @param conf
+ * @return
+ * @throws IOException
+ */
+ static RevisionManager getOpenedRevisionManager(String className, Configuration conf) throws IOException {
+
+ Properties properties = new Properties();
+ String zkHostList = conf.get(HConstants.ZOOKEEPER_QUORUM);
+ int port = conf.getInt(HConstants.ZOOKEEPER_CLIENT_PORT,
+ HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
+
+ if (zkHostList != null) {
+ String[] splits = zkHostList.split(",");
+ StringBuffer sb = new StringBuffer();
+ for (String split : splits) {
+ sb.append(split);
+ sb.append(':');
+ sb.append(port);
+ sb.append(',');
+ }
+
+ sb.deleteCharAt(sb.length() - 1);
+ properties.put(ZKBasedRevisionManager.HOSTLIST, sb.toString());
+ }
+ String dataDir = conf.get(ZKBasedRevisionManager.DATADIR);
+ if (dataDir != null) {
+ properties.put(ZKBasedRevisionManager.DATADIR, dataDir);
+ }
+ RevisionManager revisionMgr = RevisionManagerFactory
+ .getRevisionManager(className, properties);
+ if (revisionMgr instanceof Configurable) {
+ ((Configurable)revisionMgr).setConf(conf);
+ }
+ revisionMgr.open();
+ return revisionMgr;
+ }
+
+ /**
+ * Gets an instance of revision manager which is opened.
+ * The revision manager implementation can be specified as {@link #REVISION_MGR_IMPL_CLASS},
+ * default is {@link ZKBasedRevisionManager}.
+ * @param hbaseConf The HBase configuration.
+ * @return RevisionManager An instance of revision manager.
+ * @throws IOException
+ */
+ public static RevisionManager getOpenedRevisionManager(Configuration hbaseConf) throws IOException {
+ String className = hbaseConf.get(RevisionManagerFactory.REVISION_MGR_IMPL_CLASS,
+ ZKBasedRevisionManager.class.getName());
+ return getOpenedRevisionManager(className, hbaseConf);
+ }
+
}
Added: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerProtocol.java?rev=1304536&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerProtocol.java (added)
+++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/RevisionManagerProtocol.java Fri Mar 23 18:14:36 2012
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
+
+/**
+ * Interface marker to implement RevisionManager as Coprocessor.
+ * (needs to extend CoprocessorProtocol)
+ */
+public interface RevisionManagerProtocol extends RevisionManager,
+ CoprocessorProtocol {
+
+}
Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java?rev=1304536&r1=1304535&r2=1304536&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java (original)
+++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/TableSnapshot.java Fri Mar 23 18:14:36 2012
@@ -17,6 +17,7 @@
*/
package org.apache.hcatalog.hbase.snapshot;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -24,7 +25,7 @@ import java.util.Map;
/**
* The snapshot for a table and a list of column families.
*/
-public class TableSnapshot {
+public class TableSnapshot implements Serializable {
private String name;
@@ -35,6 +36,9 @@ public class TableSnapshot {
public TableSnapshot(String name, Map<String, Long> cfRevMap, long latestRevision) {
this.name = name;
+ if (cfRevMap == null) {
+ throw new IllegalArgumentException("revision map cannot be null");
+ }
this.cfRevisionMap = cfRevMap;
this.latestRevision = latestRevision;
}
Added: incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/package-info.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/package-info.java?rev=1304536&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/package-info.java (added)
+++ incubator/hcatalog/trunk/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/snapshot/package-info.java Fri Mar 23 18:14:36 2012
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Provides a revision manager for data stored in HBase that can be used to implement repeatable reads.
+ * The component is designed to be usable for revision management of data stored in HBase in general,
+ * independent and not limited to HCatalog. It is used by the HCatalog HBase storage handler, implementation depends on HBase 0.92+.
+ * <p>
+ * For more information please see
+ * <a href="https://cwiki.apache.org/confluence/display/HCATALOG/Snapshots+and+Repeatable+reads+for+HBase+Tables">Snapshots and Repeatable reads for HBase Tables</a>.
+ * @since 0.4
+ */
+package org.apache.hcatalog.hbase.snapshot;
Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java?rev=1304536&r1=1304535&r2=1304536&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java (original)
+++ incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/ManyMiniCluster.java Fri Mar 23 18:14:36 2012
@@ -254,8 +254,8 @@ public class ManyMiniCluster {
hbaseConf.set("hbase.rootdir", hbaseRoot);
hbaseConf.set("hbase.master", "local");
- hbaseConf.setInt("hbase.zookeeper.property.clientPort", zookeeperPort);
- hbaseConf.set("hbase.zookeeper.quorum", "127.0.0.1");
+ hbaseConf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zookeeperPort);
+ hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "127.0.0.1");
hbaseConf.setInt("hbase.master.port", findFreePort());
hbaseConf.setInt("hbase.master.info.port", -1);
hbaseConf.setInt("hbase.regionserver.port", findFreePort());
@@ -306,7 +306,7 @@ public class ManyMiniCluster {
private File workDir;
private int numTaskTrackers = 1;
private JobConf jobConf;
- private HBaseConfiguration hbaseConf;
+ private Configuration hbaseConf;
private HiveConf hiveConf;
private boolean miniMRClusterEnabled = true;
@@ -329,7 +329,7 @@ public class ManyMiniCluster {
return this;
}
- public Builder hbaseConf(HBaseConfiguration hbaseConf) {
+ public Builder hbaseConf(Configuration hbaseConf) {
this.hbaseConf = hbaseConf;
return this;
}
Modified: incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/SkeletonHBaseTest.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/SkeletonHBaseTest.java?rev=1304536&r1=1304535&r2=1304536&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/SkeletonHBaseTest.java (original)
+++ incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/SkeletonHBaseTest.java Fri Mar 23 18:14:36 2012
@@ -18,9 +18,18 @@
package org.apache.hcatalog.hbase;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -28,14 +37,6 @@ import org.apache.hadoop.hive.conf.HiveC
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-
/**
* Base class for HBase Tests which need a mini cluster instance
*/
@@ -48,6 +49,12 @@ public abstract class SkeletonHBaseTest
protected static Map<String,Context> contextMap = new HashMap<String,Context>();
protected static Set<String> tableNames = new HashSet<String>();
+ /**
+ * Allow tests to alter the default MiniCluster configuration.
+ * (requires static initializer block as all setup here is static)
+ */
+ protected static Configuration testConf = null;
+
protected void createTable(String tableName, String[] families) {
try {
HBaseAdmin admin = new HBaseAdmin(getHbaseConf());
@@ -76,6 +83,7 @@ public abstract class SkeletonHBaseTest
return name;
}
+
/**
* startup an hbase cluster instance before a test suite runs
*/
@@ -173,9 +181,13 @@ public abstract class SkeletonHBaseTest
public void start() {
if(usageCount++ == 0) {
- cluster = ManyMiniCluster.create(new File(testDir)).build();
+ ManyMiniCluster.Builder b = ManyMiniCluster.create(new File(testDir));
+ if (testConf != null) {
+ b.hbaseConf(HBaseConfiguration.create(testConf));
+ }
+ cluster = b.build();
cluster.start();
- hbaseConf = cluster.getHBaseConf();
+ this.hbaseConf = cluster.getHBaseConf();
jobConf = cluster.getJobConf();
fileSystem = cluster.getFileSystem();
hiveConf = cluster.getHiveConf();
Added: incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManagerEndpoint.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManagerEndpoint.java?rev=1304536&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManagerEndpoint.java (added)
+++ incubator/hcatalog/trunk/storage-handlers/hbase/src/test/org/apache/hcatalog/hbase/snapshot/TestRevisionManagerEndpoint.java Fri Mar 23 18:14:36 2012
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase.snapshot;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hcatalog.hbase.SkeletonHBaseTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestRevisionManagerEndpoint extends SkeletonHBaseTest {
+
+ static {
+ // test case specific mini cluster settings
+ testConf = new Configuration(false);
+ testConf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+ "org.apache.hcatalog.hbase.snapshot.RevisionManagerEndpoint",
+ "org.apache.hadoop.hbase.coprocessor.GenericEndpoint");
+ testConf.set(RevisionManagerEndpoint.REVISION_MGR_ENDPOINT_IMPL_CLASS, MockRM.class.getName());
+ }
+
+ /**
+ * Mock implementation to test the protocol/serialization
+ */
+ public static class MockRM implements RevisionManager {
+
+ private static class Invocation {
+ Invocation(String methodName, Object ret, Object... args) {
+ this.methodName = methodName;
+ this.args = args;
+ this.ret = ret;
+ }
+
+ String methodName;
+ Object[] args;
+ Object ret;
+
+ private static boolean equals(Object obj1, Object obj2) {
+ if (obj1 == obj2) return true;
+ if (obj1 == null || obj2 == null) return false;
+ if (obj1 instanceof Transaction || obj1 instanceof TableSnapshot) {
+ return obj1.toString().equals(obj2.toString());
+ }
+ return obj1.equals(obj2);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ Invocation other = (Invocation)obj;
+ if (this == other) return true;
+ if (other == null) return false;
+ if (this.args != other.args) {
+ if (this.args == null || other.args == null) return false;
+ if (this.args.length != other.args.length) return false;
+ for (int i=0; i<args.length; i++) {
+ if (!equals(this.args[i], other.args[i])) return false;
+ }
+ }
+ return equals(this.ret, other.ret);
+ }
+
+ @Override
+ public String toString() {
+ return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).
+ append("method", this.methodName).
+ append("args", this.args).
+ append("returns", this.ret).
+ toString();
+ }
+ }
+
+ final static String DEFAULT_INSTANCE = "default";
+ final static Map<String, MockRM> INSTANCES = new ConcurrentHashMap<String, MockRM>();
+ Invocation lastCall;
+ boolean isOpen = false;
+
+ private <T extends Object> T recordCall(T result, Object...args) {
+ StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
+ lastCall = new Invocation(stackTrace[2].getMethodName(), result, args);
+ return result;
+ }
+
+ @Override
+ public void initialize(Properties properties) {
+ INSTANCES.put("default", this);
+ }
+
+ @Override
+ public void open() throws IOException {
+ isOpen = true;
+ }
+
+ @Override
+ public void close() throws IOException {
+ isOpen = false;
+ }
+
+ @Override
+ public Transaction beginWriteTransaction(String table,
+ List<String> families) throws IOException {
+ return recordCall(null, table, families);
+ }
+
+ @Override
+ public Transaction beginWriteTransaction(String table,
+ List<String> families, long keepAlive) throws IOException {
+ return recordCall(null, table, families, keepAlive);
+ }
+
+ @Override
+ public void commitWriteTransaction(Transaction transaction)
+ throws IOException {
+ }
+
+ @Override
+ public void abortWriteTransaction(Transaction transaction)
+ throws IOException {
+ }
+
+ @Override
+ public List<FamilyRevision> getAbortedWriteTransactions(String table,
+ String columnFamily) throws IOException {
+ return null;
+ }
+
+ @Override
+ public TableSnapshot createSnapshot(String tableName)
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public TableSnapshot createSnapshot(String tableName, long revision)
+ throws IOException {
+ TableSnapshot ret = new TableSnapshot(tableName, new HashMap<String, Long>(), revision);
+ return recordCall(ret, tableName, revision);
+ }
+
+ @Override
+ public void keepAlive(Transaction transaction) throws IOException {
+ recordCall(null, transaction);
+ }
+ }
+
+ @Test
+ public void testRevisionManagerProtocol() throws Throwable {
+
+ Configuration conf = getHbaseConf();
+ RevisionManager rm = RevisionManagerFactory.getOpenedRevisionManager(
+ RevisionManagerEndpointClient.class.getName(), conf);
+
+ MockRM mockImpl = MockRM.INSTANCES.get(MockRM.DEFAULT_INSTANCE);
+ Assert.assertNotNull(mockImpl);
+ Assert.assertTrue(mockImpl.isOpen);
+
+ Transaction t = new Transaction("t1", Arrays.asList("f1", "f2"), 0, 0);
+ MockRM.Invocation call = new MockRM.Invocation("keepAlive", null, t);
+ rm.keepAlive(t);
+ Assert.assertEquals(call.methodName, call, mockImpl.lastCall);
+
+ t = new Transaction("t2", Arrays.asList("f21", "f22"), 0, 0);
+ call = new MockRM.Invocation("beginWriteTransaction", null, t.getTableName(), t.getColumnFamilies());
+ call.ret = rm.beginWriteTransaction(t.getTableName(), t.getColumnFamilies());
+ Assert.assertEquals(call.methodName, call, mockImpl.lastCall);
+
+ call = new MockRM.Invocation("createSnapshot", null, "t3", 1L);
+ call.ret = rm.createSnapshot("t3", 1);
+ Assert.assertEquals(call.methodName, call, mockImpl.lastCall);
+
+ }
+
+}