You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2012/12/19 23:45:41 UTC
svn commit: r1424192 - in
/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase:
client/Mutation.java mapreduce/Import.java
Author: jdcryans
Date: Wed Dec 19 22:45:41 2012
New Revision: 1424192
URL: http://svn.apache.org/viewvc?rev=1424192&view=rev
Log:
HBASE-7158 Allow CopyTable to identify the source cluster (for replication scenarios)
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/Mutation.java?rev=1424192&r1=1424191&r2=1424192&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/Mutation.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/Mutation.java Wed Dec 19 22:45:41 2012
@@ -193,6 +193,7 @@ public abstract class Mutation extends O
* @param clusterId
*/
public void setClusterId(UUID clusterId) {
+ if (clusterId == null) return;
byte[] val = new byte[2*Bytes.SIZEOF_LONG];
Bytes.putLong(val, 0, clusterId.getMostSignificantBits());
Bytes.putLong(val, Bytes.SIZEOF_LONG, clusterId.getLeastSignificantBits());
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java?rev=1424192&r1=1424191&r2=1424192&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java Wed Dec 19 22:45:41 2012
@@ -22,23 +22,30 @@ package org.apache.hadoop.hbase.mapreduc
import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;
+import java.util.UUID;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.zookeeper.KeeperException;
/**
* Import data written by {@link Export}.
@@ -47,6 +54,7 @@ public class Import {
final static String NAME = "import";
final static String CF_RENAME_PROP = "HBASE_IMPORTER_RENAME_CFS";
final static String BULK_OUTPUT_CONF_KEY = "import.bulk.output";
+ private static final Log LOG = LogFactory.getLog(Import.class);
/**
* A mapper that just writes out KeyValues.
@@ -88,6 +96,7 @@ public class Import {
static class Importer
extends TableMapper<ImmutableBytesWritable, Mutation> {
private Map<byte[], byte[]> cfRenameMap;
+ private UUID clusterId;
/**
* @param row The current table row key.
@@ -128,16 +137,32 @@ public class Import {
}
}
if (put != null) {
+ put.setClusterId(clusterId);
context.write(key, put);
}
if (delete != null) {
+ delete.setClusterId(clusterId);
context.write(key, delete);
}
}
@Override
public void setup(Context context) {
- cfRenameMap = createCfRenameMap(context.getConfiguration());
+ Configuration conf = context.getConfiguration();
+ cfRenameMap = createCfRenameMap(conf);
+ try {
+ HConnection connection = HConnectionManager.getConnection(conf);
+ ZooKeeperWatcher zkw = connection.getZooKeeperWatcher();
+ ReplicationZookeeper zkHelper = new ReplicationZookeeper(connection, conf, zkw);
+ clusterId = zkHelper.getUUIDForCluster(zkw);
+ } catch (ZooKeeperConnectionException e) {
+ LOG.error("Problem connecting to ZooKeper during task setup", e);
+ } catch (KeeperException e) {
+ LOG.error("Problem reading ZooKeeper data during task setup", e);
+ } catch (IOException e) {
+ LOG.error("Problem setting up task", e);
+ }
+
}
}