You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2012/12/19 23:46:07 UTC

svn commit: r1424193 - in /hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase: client/Mutation.java mapreduce/Import.java

Author: jdcryans
Date: Wed Dec 19 22:46:07 2012
New Revision: 1424193

URL: http://svn.apache.org/viewvc?rev=1424193&view=rev
Log:
HBASE-7158  Allow CopyTable to identify the source cluster (for replication scenarios)

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Mutation.java?rev=1424193&r1=1424192&r2=1424193&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Mutation.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Mutation.java Wed Dec 19 22:46:07 2012
@@ -194,6 +194,7 @@ public abstract class Mutation extends O
    * @param clusterId
    */
   public void setClusterId(UUID clusterId) {
+    if (clusterId == null) return;
     byte[] val = new byte[2*Bytes.SIZEOF_LONG];
     Bytes.putLong(val, 0, clusterId.getMostSignificantBits());
     Bytes.putLong(val, Bytes.SIZEOF_LONG, clusterId.getLeastSignificantBits());

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java?rev=1424193&r1=1424192&r2=1424193&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java Wed Dec 19 22:46:07 2012
@@ -21,25 +21,28 @@ package org.apache.hadoop.hbase.mapreduc
 import java.io.IOException;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.UUID;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.*;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.zookeeper.KeeperException;
 
 /**
  * Import data written by {@link Export}.
@@ -50,6 +53,7 @@ public class Import {
   final static String NAME = "import";
   final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
   final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output";
+  private static final Log LOG = LogFactory.getLog(Import.class);
 
   /**
    * A mapper that just writes out KeyValues.
@@ -91,6 +95,7 @@ public class Import {
   static class Importer
   extends TableMapper<ImmutableBytesWritable, Mutation> {
     private Map<byte[], byte[]> cfRenameMap;
+    private UUID clusterId;
       
     /**
      * @param row  The current table row key.
@@ -131,16 +136,32 @@ public class Import {
         }
       }
       if (put != null) {
+        put.setClusterId(clusterId);
         context.write(key, put);
       }
       if (delete != null) {
+        delete.setClusterId(clusterId);
         context.write(key, delete);
       }
     }
 
     @Override
     public void setup(Context context) {
-      cfRenameMap = createCfRenameMap(context.getConfiguration());
+      Configuration conf = context.getConfiguration();
+      cfRenameMap = createCfRenameMap(conf);
+      try {
+        HConnection connection = HConnectionManager.getConnection(conf);
+        ZooKeeperWatcher zkw = connection.getZooKeeperWatcher();
+        ReplicationZookeeper zkHelper = new ReplicationZookeeper(connection, conf, zkw);
+        clusterId = zkHelper.getUUIDForCluster(zkw);
+      } catch (ZooKeeperConnectionException e) {
+        LOG.error("Problem connecting to ZooKeper during task setup", e);
+      } catch (KeeperException e) {
+        LOG.error("Problem reading ZooKeeper data during task setup", e);
+      } catch (IOException e) {
+        LOG.error("Problem setting up task", e);
+      }
+
     }
   }