You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2012/12/19 23:45:41 UTC

svn commit: r1424192 - in /hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase: client/Mutation.java mapreduce/Import.java

Author: jdcryans
Date: Wed Dec 19 22:45:41 2012
New Revision: 1424192

URL: http://svn.apache.org/viewvc?rev=1424192&view=rev
Log:
HBASE-7158  Allow CopyTable to identify the source cluster (for replication scenarios)

Modified:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/Mutation.java?rev=1424192&r1=1424191&r2=1424192&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/Mutation.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/Mutation.java Wed Dec 19 22:45:41 2012
@@ -193,6 +193,7 @@ public abstract class Mutation extends O
    * @param clusterId
    */
   public void setClusterId(UUID clusterId) {
+    if (clusterId == null) return;
     byte[] val = new byte[2*Bytes.SIZEOF_LONG];
     Bytes.putLong(val, 0, clusterId.getMostSignificantBits());
     Bytes.putLong(val, Bytes.SIZEOF_LONG, clusterId.getLeastSignificantBits());

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java?rev=1424192&r1=1424191&r2=1424192&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java Wed Dec 19 22:45:41 2012
@@ -22,23 +22,30 @@ package org.apache.hadoop.hbase.mapreduc
 import java.io.IOException;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.UUID;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.zookeeper.KeeperException;
 
 /**
  * Import data written by {@link Export}.
@@ -47,6 +54,7 @@ public class Import {
   final static String NAME = "import";
   final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
   final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output";
+  private static final Log LOG = LogFactory.getLog(Import.class);
 
   /**
    * A mapper that just writes out KeyValues.
@@ -88,6 +96,7 @@ public class Import {
   static class Importer
   extends TableMapper<ImmutableBytesWritable, Mutation> {
     private Map<byte[], byte[]> cfRenameMap;
+    private UUID clusterId;
       
     /**
      * @param row  The current table row key.
@@ -128,16 +137,32 @@ public class Import {
         }
       }
       if (put != null) {
+        put.setClusterId(clusterId);
         context.write(key, put);
       }
       if (delete != null) {
+        delete.setClusterId(clusterId);
         context.write(key, delete);
       }
     }
 
     @Override
     public void setup(Context context) {
-      cfRenameMap = createCfRenameMap(context.getConfiguration());
+      Configuration conf = context.getConfiguration();
+      cfRenameMap = createCfRenameMap(conf);
+      try {
+        HConnection connection = HConnectionManager.getConnection(conf);
+        ZooKeeperWatcher zkw = connection.getZooKeeperWatcher();
+        ReplicationZookeeper zkHelper = new ReplicationZookeeper(connection, conf, zkw);
+        clusterId = zkHelper.getUUIDForCluster(zkw);
+      } catch (ZooKeeperConnectionException e) {
+        LOG.error("Problem connecting to ZooKeper during task setup", e);
+      } catch (KeeperException e) {
+        LOG.error("Problem reading ZooKeeper data during task setup", e);
+      } catch (IOException e) {
+        LOG.error("Problem setting up task", e);
+      }
+
     }
   }