You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2012/12/19 23:46:07 UTC
svn commit: r1424193 - in
/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase:
client/Mutation.java mapreduce/Import.java
Author: jdcryans
Date: Wed Dec 19 22:46:07 2012
New Revision: 1424193
URL: http://svn.apache.org/viewvc?rev=1424193&view=rev
Log:
HBASE-7158 Allow CopyTable to identify the source cluster (for replication scenarios)
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Mutation.java?rev=1424193&r1=1424192&r2=1424193&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Mutation.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/Mutation.java Wed Dec 19 22:46:07 2012
@@ -194,6 +194,7 @@ public abstract class Mutation extends O
* @param clusterId
*/
public void setClusterId(UUID clusterId) {
+ if (clusterId == null) return;
byte[] val = new byte[2*Bytes.SIZEOF_LONG];
Bytes.putLong(val, 0, clusterId.getMostSignificantBits());
Bytes.putLong(val, Bytes.SIZEOF_LONG, clusterId.getLeastSignificantBits());
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java?rev=1424193&r1=1424192&r2=1424193&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java Wed Dec 19 22:46:07 2012
@@ -21,25 +21,28 @@ package org.apache.hadoop.hbase.mapreduc
import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;
+import java.util.UUID;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.zookeeper.KeeperException;
/**
* Import data written by {@link Export}.
@@ -50,6 +53,7 @@ public class Import {
final static String NAME = "import";
final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output";
+ private static final Log LOG = LogFactory.getLog(Import.class);
/**
* A mapper that just writes out KeyValues.
@@ -91,6 +95,7 @@ public class Import {
static class Importer
extends TableMapper<ImmutableBytesWritable, Mutation> {
private Map<byte[], byte[]> cfRenameMap;
+ private UUID clusterId;
/**
* @param row The current table row key.
@@ -131,16 +136,32 @@ public class Import {
}
}
if (put != null) {
+ put.setClusterId(clusterId);
context.write(key, put);
}
if (delete != null) {
+ delete.setClusterId(clusterId);
context.write(key, delete);
}
}
@Override
public void setup(Context context) {
- cfRenameMap = createCfRenameMap(context.getConfiguration());
+ Configuration conf = context.getConfiguration();
+ cfRenameMap = createCfRenameMap(conf);
+ try {
+ HConnection connection = HConnectionManager.getConnection(conf);
+ ZooKeeperWatcher zkw = connection.getZooKeeperWatcher();
+ ReplicationZookeeper zkHelper = new ReplicationZookeeper(connection, conf, zkw);
+ clusterId = zkHelper.getUUIDForCluster(zkw);
+ } catch (ZooKeeperConnectionException e) {
+ LOG.error("Problem connecting to ZooKeper during task setup", e);
+ } catch (KeeperException e) {
+ LOG.error("Problem reading ZooKeeper data during task setup", e);
+ } catch (IOException e) {
+ LOG.error("Problem setting up task", e);
+ }
+
}
}