You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ma...@apache.org on 2016/10/13 12:40:11 UTC
sqoop git commit: SQOOP-2952: Fixing bug (row key not added into
column family using --hbase-bulkload)
Repository: sqoop
Updated Branches:
refs/heads/trunk 0f13c474b -> b4afcf417
SQOOP-2952: Fixing bug (row key not added into column family using --hbase-bulkload)
(Szabolcs Vasas via Attila Szabo)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/b4afcf41
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/b4afcf41
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/b4afcf41
Branch: refs/heads/trunk
Commit: b4afcf4179b13c25b5e9bd182d75cab5d2e6c8d1
Parents: 0f13c47
Author: Attila Szabo <ma...@apache.org>
Authored: Thu Oct 13 14:38:21 2016 +0200
Committer: Attila Szabo <ma...@apache.org>
Committed: Thu Oct 13 14:38:21 2016 +0200
----------------------------------------------------------------------
build.xml | 14 ++
ivy.xml | 23 ++++
.../apache/sqoop/hbase/HBasePutProcessor.java | 32 ++---
.../org/apache/sqoop/hbase/PutTransformer.java | 4 +
.../sqoop/hbase/ToStringPutTransformer.java | 30 ++++-
.../sqoop/mapreduce/HBaseBulkImportMapper.java | 3 +-
.../sqoop/hbase/HBaseImportAddRowKeyTest.java | 128 ++++++++++++++-----
.../com/cloudera/sqoop/hbase/HBaseTestCase.java | 25 ++--
8 files changed, 175 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b4afcf41/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 97e5502..7f948b3 100644
--- a/build.xml
+++ b/build.xml
@@ -185,6 +185,20 @@
<property name="avrohadoopprofile" value="2" />
</then>
</elseif>
+
+ <!-- hadoopversion 260 is created for testing purposes only. Do not use it in production! -->
+ <elseif>
+ <equals arg1="${hadoopversion}" arg2="260" />
+ <then>
+ <property name="hadoop.version" value="2.6.0" />
+ <property name="hbase95.version" value="1.2.0" />
+ <property name="zookeeper.version" value="3.4.5" />
+ <property name="hadoop.version.full" value="2.6.0" />
+ <property name="hcatalog.version" value="0.13.0" />
+ <property name="hbasecompatprofile" value="2" />
+ <property name="avrohadoopprofile" value="2" />
+ </then>
+ </elseif>
<else>
<fail message="Unrecognized hadoopversion. Can only be 20, 23, 100, 200 or 210." />
</else>
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b4afcf41/ivy.xml
----------------------------------------------------------------------
diff --git a/ivy.xml b/ivy.xml
index a502530..ee1dafa 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -55,6 +55,8 @@ under the License.
extends="common,runtime,avro,hbase${hbaseprofile},hcatalog${hcatprofile},accumulo" />
<conf name="hadoop210" visibility="private"
extends="common,runtime,avro,hbase${hbaseprofile},hcatalog${hcatprofile},accumulo" />
+ <conf name="hadoop260" visibility="private"
+ extends="common,runtime,avro,hbase${hbaseprofile},hcatalog${hcatprofile},accumulo" />
<conf name="test" visibility="private" extends="common,runtime"/>
<conf name="hadoop23test" visibility="private" extends="test,hadoop23" />
@@ -62,6 +64,7 @@ under the License.
<conf name="hadoop100test" visibility="private" extends="test,hadoop100" />
<conf name="hadoop200test" visibility="private" extends="test,hadoop200" />
<conf name="hadoop210test" visibility="private" extends="test,hadoop210" />
+ <conf name="hadoop260test" visibility="private" extends="test,hadoop260" />
<!-- We don't redistribute everything we depend on (e.g., Hadoop itself);
anything which Hadoop itself also depends on, we do not ship.
@@ -105,6 +108,26 @@ under the License.
<dependency org="org.aspectj" name="aspectjrt" rev="${aspectj.version}"
conf="hadoop210->default"/>
+ <!-- Dependencies for Hadoop 2.6.0 -->
+ <dependency org="org.apache.hadoop" name="hadoop-common"
+ rev="${hadoop.version}" conf="hadoop260->default">
+ <artifact name="hadoop-common" type="jar" />
+ <artifact name="hadoop-common" type="jar" m:classifier="tests"/>
+ </dependency>
+ <dependency org="org.apache.hadoop" name="hadoop-hdfs"
+ rev="${hadoop.version}" conf="hadoop260->default">
+ <artifact name="hadoop-hdfs" type="jar" />
+ <artifact name="hadoop-hdfs" type="jar" m:classifier="tests"/>
+ </dependency>
+ <dependency org="org.apache.hadoop" name="hadoop-mapreduce-client-common"
+ rev="${hadoop.version}" conf="hadoop260->default"/>
+ <dependency org="org.apache.hadoop" name="hadoop-mapreduce-client-core"
+ rev="${hadoop.version}" conf="hadoop260->default"/>
+ <dependency org="org.aspectj" name="aspectjtools" rev="${aspectj.version}"
+ conf="hadoop260->default"/>
+ <dependency org="org.aspectj" name="aspectjrt" rev="${aspectj.version}"
+ conf="hadoop260->default"/>
+
<!-- Dependencies for Hadoop 2.0.0 -->
<dependency org="org.apache.hadoop" name="hadoop-common"
rev="${hadoop.version}" conf="hadoop200->default">
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b4afcf41/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
index b2431ac..fdbe127 100644
--- a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
+++ b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
@@ -18,11 +18,9 @@
package org.apache.sqoop.hbase;
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
+import com.cloudera.sqoop.lib.FieldMapProcessor;
+import com.cloudera.sqoop.lib.FieldMappable;
+import com.cloudera.sqoop.lib.ProcessingException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
@@ -31,11 +29,11 @@ import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.sqoop.mapreduce.ImportJobBase;
-import com.cloudera.sqoop.lib.FieldMappable;
-import com.cloudera.sqoop.lib.FieldMapProcessor;
-import com.cloudera.sqoop.lib.ProcessingException;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
/**
* SqoopRecordProcessor that performs an HBase "put" operation
@@ -105,21 +103,7 @@ public class HBasePutProcessor implements Closeable, Configurable,
if (null == putTransformer) {
throw new RuntimeException("Could not instantiate PutTransformer.");
}
-
- this.putTransformer.setColumnFamily(conf.get(COL_FAMILY_KEY, null));
- this.putTransformer.setRowKeyColumn(conf.get(ROW_KEY_COLUMN_KEY, null));
-
- if (this.putTransformer instanceof ToStringPutTransformer) {
- ToStringPutTransformer stringPutTransformer =
- (ToStringPutTransformer) this.putTransformer;
- stringPutTransformer.bigDecimalFormatString =
- conf.getBoolean(ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT,
- ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);
- stringPutTransformer.addRowKey =
- conf.getBoolean(HBasePutProcessor.ADD_ROW_KEY,
- HBasePutProcessor.ADD_ROW_KEY_DEFAULT);
- stringPutTransformer.detectCompositeKey();
- }
+ putTransformer.init(conf);
this.tableName = conf.get(TABLE_NAME_KEY, null);
try {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b4afcf41/src/java/org/apache/sqoop/hbase/PutTransformer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/hbase/PutTransformer.java b/src/java/org/apache/sqoop/hbase/PutTransformer.java
index 8d6bcac..533467e 100644
--- a/src/java/org/apache/sqoop/hbase/PutTransformer.java
+++ b/src/java/org/apache/sqoop/hbase/PutTransformer.java
@@ -22,6 +22,8 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Put;
/**
@@ -33,6 +35,8 @@ public abstract class PutTransformer {
private String columnFamily;
private String rowKeyColumn;
+ public abstract void init(Configuration conf);
+
/**
* @return the default column family to insert into.
*/
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b4afcf41/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java b/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java
index b5cad1d..363e145 100644
--- a/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java
+++ b/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java
@@ -18,6 +18,15 @@
package org.apache.sqoop.hbase;
+import com.cloudera.sqoop.hbase.PutTransformer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.sqoop.mapreduce.ImportJobBase;
+
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
@@ -27,13 +36,10 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.util.StringUtils;
-
-import com.cloudera.sqoop.hbase.PutTransformer;
+import static org.apache.sqoop.hbase.HBasePutProcessor.ADD_ROW_KEY;
+import static org.apache.sqoop.hbase.HBasePutProcessor.ADD_ROW_KEY_DEFAULT;
+import static org.apache.sqoop.hbase.HBasePutProcessor.COL_FAMILY_KEY;
+import static org.apache.sqoop.hbase.HBasePutProcessor.ROW_KEY_COLUMN_KEY;
/**
* PutTransformer that calls toString on all non-null fields.
@@ -204,4 +210,14 @@ public class ToStringPutTransformer extends PutTransformer {
return valString;
}
+ @Override
+ public void init(Configuration conf) {
+ setColumnFamily(conf.get(COL_FAMILY_KEY, null));
+ setRowKeyColumn(conf.get(ROW_KEY_COLUMN_KEY, null));
+
+ this.bigDecimalFormatString = conf.getBoolean(ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT,
+ ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);
+ this.addRowKey = conf.getBoolean(ADD_ROW_KEY, ADD_ROW_KEY_DEFAULT);
+ detectCompositeKey();
+ }
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b4afcf41/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java
index 363b5d7..58ccee7 100644
--- a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java
+++ b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java
@@ -66,8 +66,7 @@ public class HBaseBulkImportMapper
if (null == putTransformer) {
throw new RuntimeException("Could not instantiate PutTransformer.");
}
- this.putTransformer.setColumnFamily(conf.get(COL_FAMILY_KEY, null));
- this.putTransformer.setRowKeyColumn(conf.get(ROW_KEY_COLUMN_KEY, null));
+ putTransformer.init(conf);
}
@Override
public void map(LongWritable key, SqoopRecord val, Context context)
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b4afcf41/src/test/com/cloudera/sqoop/hbase/HBaseImportAddRowKeyTest.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/hbase/HBaseImportAddRowKeyTest.java b/src/test/com/cloudera/sqoop/hbase/HBaseImportAddRowKeyTest.java
index cfbb1d3..abf9f1c 100644
--- a/src/test/com/cloudera/sqoop/hbase/HBaseImportAddRowKeyTest.java
+++ b/src/test/com/cloudera/sqoop/hbase/HBaseImportAddRowKeyTest.java
@@ -18,68 +18,126 @@
package com.cloudera.sqoop.hbase;
+import junit.framework.JUnit4TestAdapter;
+import org.apache.commons.lang.StringUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
-import org.junit.Test;
+import static java.util.Arrays.asList;
+import static org.apache.commons.lang.StringUtils.join;
-/**
- *
- */
+@RunWith(Parameterized.class)
public class HBaseImportAddRowKeyTest extends HBaseTestCase {
+ @Parameterized.Parameters(name = "bulkLoad = {0}")
+ public static Iterable<? extends Object> bulkLoadParameters() {
+ return Arrays.asList(new Boolean[] { false } , new Boolean[] { true } );
+ }
+
+ private String[] columnTypes;
+
+ private String[] columnValues;
+
+ private String hbaseTableName;
+
+ private String hbaseColumnFamily;
+
+ private String hbaseTmpDir;
+
+ private String hbaseBulkLoadDir;
+
+ private boolean bulkLoad;
+
+ public HBaseImportAddRowKeyTest(boolean bulkLoad) {
+ this.bulkLoad = bulkLoad;
+ }
+
+ @Before
+ public void setUp() {
+ super.setUp();
+ columnTypes = new String[] { "INT", "INT" };
+ columnValues = new String[] { "0", "1" };
+ hbaseTableName = "addRowKeyTable";
+ hbaseColumnFamily = "addRowKeyFamily";
+ hbaseTmpDir = TEMP_BASE_DIR + "hbaseTmpDir";
+ hbaseBulkLoadDir = TEMP_BASE_DIR + "hbaseBulkLoadDir";
+ createTableWithColTypes(columnTypes, columnValues);
+ }
+
@Test
public void testAddRowKey() throws IOException {
- String[] types = { "INT", "INT" };
- String[] vals = { "0", "1" };
- createTableWithColTypes(types, vals);
-
- String[] otherArg = getArgv(true, "addRowKeyT", "addRowKeyF", true, null);
- String[] argv = new String[otherArg.length + 2];
- argv[0] = "-D";
- argv[1] = "sqoop.hbase.add.row.key=true";
- System.arraycopy(otherArg, 0, argv, 2, otherArg.length);
+ String[] argv = getImportArguments(true, hbaseTableName, hbaseColumnFamily);
runImport(argv);
// Row key should have been added
- verifyHBaseCell("addRowKeyT", "0", "addRowKeyF", getColName(0), "0");
- verifyHBaseCell("addRowKeyT", "0", "addRowKeyF", getColName(1), "1");
+ verifyHBaseCell(hbaseTableName, columnValues[0], hbaseColumnFamily, getColName(0), columnValues[0]);
+ verifyHBaseCell(hbaseTableName, columnValues[0], hbaseColumnFamily, getColName(1), columnValues[1]);
}
@Test
public void testAddRowKeyDefault() throws IOException {
- String[] types = { "INT", "INT" };
- String[] vals = { "0", "1" };
- createTableWithColTypes(types, vals);
-
- String[] argv = getArgv(true, "addRowKeyDfT", "addRowKeyDfF", true, null);
+ String[] argv = getImportArguments(false, hbaseTableName, hbaseColumnFamily);
runImport(argv);
// Row key should not be added by default
- verifyHBaseCell("addRowKeyDfT", "0", "addRowKeyDfF", getColName(0), null);
- verifyHBaseCell("addRowKeyDfT", "0", "addRowKeyDfF", getColName(1), "1");
+ verifyHBaseCell(hbaseTableName, columnValues[0], hbaseColumnFamily, getColName(0), null);
+ verifyHBaseCell(hbaseTableName, columnValues[0], hbaseColumnFamily, getColName(1), columnValues[1]);
}
@Test
public void testAddCompositeKey() throws IOException {
- String[] types = { "INT", "INT" };
- String[] vals = { "0", "1" };
- createTableWithColTypes(types, vals);
-
- String[] otherArg = getArgv(true, "addRowKeyT", "addRowKeyF", true, null);
- String[] argv = new String[otherArg.length + 4];
- argv[0]="-D";
- argv[1]="sqoop.hbase.add.row.key=true";
- System.arraycopy(otherArg, 0, argv, 2, otherArg.length);
- argv[argv.length - 2] = "--hbase-row-key";
- argv[argv.length - 1] = getColName(0)+","+getColName(1);
+ String rowKey = getColName(0)+","+getColName(1);
+
+ String[] argv = getImportArguments(true, hbaseTableName, hbaseColumnFamily, rowKey);
runImport(argv);
// Row key should have been added
- verifyHBaseCell("addRowKeyT", "0_1", "addRowKeyF", getColName(0), "0");
- verifyHBaseCell("addRowKeyT", "0_1", "addRowKeyF", getColName(1), "1");
+ verifyHBaseCell(hbaseTableName, join(columnValues, '_'), hbaseColumnFamily, getColName(0), columnValues[0]);
+ verifyHBaseCell(hbaseTableName, join(columnValues, '_'), hbaseColumnFamily, getColName(1), columnValues[1]);
+ }
+
+ private String[] getImportArguments(boolean addRowKey, String hbaseTableName, String hbaseColumnFamily) {
+ return getImportArguments(addRowKey, hbaseTableName, hbaseColumnFamily, null);
+ }
+
+ private String[] getImportArguments(boolean addRowKey, String hbaseTableName, String hbaseColumnFamily, String rowKey) {
+ List<String> result = new ArrayList<>();
+
+ if (addRowKey) {
+ result.add("-D");
+ result.add("sqoop.hbase.add.row.key=true");
+ }
+ result.add("-D");
+ result.add("hbase.fs.tmp.dir=" + hbaseTmpDir);
+
+ result.addAll(asList(getArgv(true, hbaseTableName, hbaseColumnFamily, true, null)));
+
+ if(bulkLoad) {
+ result.add("--target-dir");
+ result.add(hbaseBulkLoadDir);
+ result.add("--hbase-bulkload");
+ }
+
+ if (!StringUtils.isBlank(rowKey)) {
+ result.add("--hbase-row-key");
+ result.add(rowKey);
+ }
+
+ return result.toArray(new String[result.size()]);
+ }
+
+ public static junit.framework.Test suite() {
+ return new JUnit4TestAdapter(HBaseImportAddRowKeyTest.class);
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/b4afcf41/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java b/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java
index 37dc004..ad92a07 100644
--- a/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java
+++ b/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java
@@ -88,7 +88,7 @@ public abstract class HBaseTestCase extends ImportJobTestCase {
if (includeHadoopFlags) {
CommonArgs.addHadoopFlags(args);
args.add("-D");
- args.add("hbase.zookeeper.property.clientPort=21818");
+ args.add("hbase.zookeeper.property.clientPort=" + zookeeperPort);
}
if (null != queryStr) {
@@ -120,40 +120,33 @@ public abstract class HBaseTestCase extends ImportJobTestCase {
private String workDir = createTempDir().getAbsolutePath();
private MiniZooKeeperCluster zookeeperCluster;
private MiniHBaseCluster hbaseCluster;
+ private int zookeeperPort;
@Override
@Before
public void setUp() {
try {
+ String zookeeperDir = new File(workDir, "zk").getAbsolutePath();
+ zookeeperCluster = new MiniZooKeeperCluster();
+ zookeeperCluster.startup(new File(zookeeperDir));
+ zookeeperPort = zookeeperCluster.getClientPort();
+
HBaseTestCase.recordTestBuildDataProperty();
String hbaseDir = new File(workDir, "hbase").getAbsolutePath();
String hbaseRoot = "file://" + hbaseDir;
Configuration hbaseConf = HBaseConfiguration.create();
hbaseConf.set(HConstants.HBASE_DIR, hbaseRoot);
//Hbase 0.90 does not have HConstants.ZOOKEEPER_CLIENT_PORT
- hbaseConf.setInt("hbase.zookeeper.property.clientPort", 21818);
+ hbaseConf.setInt("hbase.zookeeper.property.clientPort", zookeeperPort);
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "0.0.0.0");
hbaseConf.setInt("hbase.master.info.port", -1);
hbaseConf.setInt("hbase.zookeeper.property.maxClientCnxns", 500);
- String zookeeperDir = new File(workDir, "zk").getAbsolutePath();
- int zookeeperPort = 21818;
- zookeeperCluster = new MiniZooKeeperCluster();
- Method m;
- Class<?> zkParam[] = {Integer.TYPE};
- try {
- m = MiniZooKeeperCluster.class.getDeclaredMethod("setDefaultClientPort",
- zkParam);
- } catch (NoSuchMethodException e) {
- m = MiniZooKeeperCluster.class.getDeclaredMethod("setClientPort",
- zkParam);
- }
- m.invoke(zookeeperCluster, new Object[]{new Integer(zookeeperPort)});
- zookeeperCluster.startup(new File(zookeeperDir));
hbaseCluster = new MiniHBaseCluster(hbaseConf, 1);
HMaster master = hbaseCluster.getMaster();
Object serverName = master.getServerName();
String hostAndPort;
+ Method m;
if (serverName instanceof String) {
System.out.println("Server name is string, using HServerAddress.");
m = HMaster.class.getDeclaredMethod("getMasterAddress",