You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by an...@apache.org on 2017/08/03 13:25:27 UTC
sqoop git commit: SQOOP-3149: Sqoop incremental import - NULL column
updates are not pulled into HBase table
Repository: sqoop
Updated Branches:
refs/heads/trunk 46f9e2d9d -> ba058ac33
SQOOP-3149: Sqoop incremental import - NULL column updates are not pulled into HBase table
(Jilani Shaik via Anna Szonyi)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/ba058ac3
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/ba058ac3
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/ba058ac3
Branch: refs/heads/trunk
Commit: ba058ac3326aed16eca30c42faa7f0cb391fb299
Parents: 46f9e2d
Author: Anna Szonyi <an...@apache.org>
Authored: Thu Aug 3 15:25:36 2017 +0200
Committer: Anna Szonyi <an...@apache.org>
Committed: Thu Aug 3 15:25:36 2017 +0200
----------------------------------------------------------------------
.../apache/sqoop/hbase/HBasePutProcessor.java | 34 +++--
.../org/apache/sqoop/hbase/PutTransformer.java | 5 +-
.../sqoop/hbase/ToStringPutTransformer.java | 31 +++--
.../sqoop/mapreduce/HBaseBulkImportMapper.java | 10 +-
.../cloudera/sqoop/hbase/HBaseImportTest.java | 54 ++++++++
.../com/cloudera/sqoop/hbase/HBaseTestCase.java | 58 +++++---
.../sqoop/testutil/BaseSqoopTestCase.java | 135 ++++++++++++++++++-
7 files changed, 282 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba058ac3/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
index fdbe127..032fd38 100644
--- a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
+++ b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
@@ -25,7 +25,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.ReflectionUtils;
@@ -128,17 +130,27 @@ public class HBasePutProcessor implements Closeable, Configurable,
public void accept(FieldMappable record)
throws IOException, ProcessingException {
Map<String, Object> fields = record.getFieldMap();
-
- List<Put> putList = putTransformer.getPutCommand(fields);
- if (null != putList) {
- for (Put put : putList) {
- if (put!=null) {
- if (put.isEmpty()) {
- LOG.warn("Could not insert row with no columns "
- + "for row-key column: " + Bytes.toString(put.getRow()));
- } else {
- this.table.put(put);
- }
+ List<Mutation> mutationList = putTransformer.getMutationCommand(fields);
+ if (null != mutationList) {
+ for (Mutation mutation : mutationList) {
+ if (mutation!=null) {
+ if(mutation instanceof Put) {
+ Put putObject = (Put) mutation;
+ if (putObject.isEmpty()) {
+ LOG.warn("Could not insert row with no columns "
+ + "for row-key column: " + Bytes.toString(putObject.getRow()));
+ } else {
+ this.table.put(putObject);
+ }
+ } else if(mutation instanceof Delete) {
+ Delete deleteObject = (Delete) mutation;
+ if (deleteObject.isEmpty()) {
+ LOG.warn("Could not delete row with no columns "
+ + "for row-key column: " + Bytes.toString(deleteObject.getRow()));
+ } else {
+ this.table.delete(deleteObject);
+ }
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba058ac3/src/java/org/apache/sqoop/hbase/PutTransformer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/hbase/PutTransformer.java b/src/java/org/apache/sqoop/hbase/PutTransformer.java
index 533467e..c4496ee 100644
--- a/src/java/org/apache/sqoop/hbase/PutTransformer.java
+++ b/src/java/org/apache/sqoop/hbase/PutTransformer.java
@@ -22,9 +22,8 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
-import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Mutation;
/**
* Interface that takes a map of jdbc field names to values
@@ -71,7 +70,7 @@ public abstract class PutTransformer {
* @param fields a map of field names to values to insert.
* @return A list of Put commands that inserts these into HBase.
*/
- public abstract List<Put> getPutCommand(Map<String, Object> fields)
+ public abstract List<Mutation> getMutationCommand(Map<String, Object> fields)
throws IOException;
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba058ac3/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java b/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java
index 363e145..20bf1b9 100644
--- a/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java
+++ b/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java
@@ -22,6 +22,8 @@ import com.cloudera.sqoop.hbase.PutTransformer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.StringUtils;
@@ -106,7 +108,7 @@ public class ToStringPutTransformer extends PutTransformer {
@Override
/** {@inheritDoc} */
- public List<Put> getPutCommand(Map<String, Object> fields)
+ public List<Mutation> getMutationCommand(Map<String, Object> fields)
throws IOException {
String rowKeyCol = getRowKeyColumn();
@@ -140,7 +142,7 @@ public class ToStringPutTransformer extends PutTransformer {
// from composite key
String compositeRowKey = StringUtils.join(DELIMITER_HBASE, rowKeyList);
// Insert record in HBase
- return putRecordInHBase(fields, colFamily, compositeRowKey);
+ return mutationRecordInHBase(fields, colFamily, compositeRowKey);
} else {
// if row-key is regular primary key
@@ -154,23 +156,21 @@ public class ToStringPutTransformer extends PutTransformer {
}
String hBaseRowKey = toHBaseString(rowKey);
- return putRecordInHBase(fields, colFamily, hBaseRowKey);
+ return mutationRecordInHBase(fields, colFamily, hBaseRowKey);
}
}
/**
- * Performs actual Put operation for the specified record in HBase.
+ * Performs actual Put/delete operation for the specified record in HBase.
* @param record
* @param colFamily
* @param rowKey
- * @return List containing a single put command
+ * @return List containing a put/delete command
*/
- private List<Put> putRecordInHBase(Map<String, Object> record,
+ private List<Mutation> mutationRecordInHBase(Map<String, Object> record,
String colFamily, String rowKey) {
- // Put row-key in HBase
- Put put = new Put(Bytes.toBytes(rowKey));
byte[] colFamilyBytes = Bytes.toBytes(colFamily);
-
+ List<Mutation> mutationList = new ArrayList<Mutation>();
for (Map.Entry<String, Object> fieldEntry : record.entrySet()) {
String colName = fieldEntry.getKey();
boolean rowKeyCol = false;
@@ -187,17 +187,24 @@ public class ToStringPutTransformer extends PutTransformer {
// check addRowKey flag before including rowKey field.
Object val = fieldEntry.getValue();
if (null != val) {
+ // Put row-key in HBase
+ Put put = new Put(Bytes.toBytes(rowKey));
if ( val instanceof byte[]) {
- put.add(colFamilyBytes, getFieldNameBytes(colName),
+ put.addColumn(colFamilyBytes, getFieldNameBytes(colName),
(byte[])val);
} else {
- put.add(colFamilyBytes, getFieldNameBytes(colName),
+ put.addColumn(colFamilyBytes, getFieldNameBytes(colName),
Bytes.toBytes(toHBaseString(val)));
}
+ mutationList.add(put);
+ } else {
+ Delete delete = new Delete(Bytes.toBytes(rowKey));
+ delete.addColumn(colFamilyBytes, getFieldNameBytes(colName));
+ mutationList.add(delete);
}
}
}
- return Collections.singletonList(put);
+ return Collections.unmodifiableList(mutationList);
}
private String toHBaseString(Object val) {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba058ac3/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java
index 58ccee7..4b583dd 100644
--- a/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java
+++ b/src/java/org/apache/sqoop/mapreduce/HBaseBulkImportMapper.java
@@ -25,6 +25,7 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
@@ -79,9 +80,12 @@ public class HBaseBulkImportMapper
}
Map<String, Object> fields = val.getFieldMap();
- List<Put> putList = putTransformer.getPutCommand(fields);
- for(Put put: putList){
- context.write(new ImmutableBytesWritable(put.getRow()), put);
+ List<Mutation> mutationList = putTransformer.getMutationCommand(fields);
+ for(Mutation mutation: mutationList){
+ if(mutation != null && mutation instanceof Put) {
+ Put putObject = (Put) mutation;
+ context.write(new ImmutableBytesWritable(putObject.getRow()), putObject);
+ }
}
}
@Override
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba058ac3/src/test/com/cloudera/sqoop/hbase/HBaseImportTest.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/hbase/HBaseImportTest.java b/src/test/com/cloudera/sqoop/hbase/HBaseImportTest.java
index fa14a01..4d79341 100644
--- a/src/test/com/cloudera/sqoop/hbase/HBaseImportTest.java
+++ b/src/test/com/cloudera/sqoop/hbase/HBaseImportTest.java
@@ -72,6 +72,60 @@ public class HBaseImportTest extends HBaseTestCase {
}
@Test
+ public void testOverwriteNullColumnsSucceeds() throws IOException {
+ // Test that we can create a table and then import immediately
+ // back on top of it without problem and then update with null to validate
+ String [] argv = getArgv(true, "OverwriteTable", "OverwriteColumnFamily", true, null);
+ String [] types = { "INT", "INT", "INT", "DATETIME" };
+ String [] vals = { "0", "1", "1", "'2017-03-20'" };
+ createTableWithColTypes(types, vals);
+ runImport(argv);
+ verifyHBaseCell("OverwriteTable", "0", "OverwriteColumnFamily", getColName(2), "1");
+ // Run a second time.
+ argv = getIncrementalArgv(true, "OverwriteTable", "OverwriteColumnFamily", true, null, false, false, "DATA_COL3", "2017-03-24 01:01:01.0", null);
+ vals = new String[] { "0", "1", null, "'2017-03-25'" };
+ updateTable(types, vals);
+ runImport(argv);
+ verifyHBaseCell("OverwriteTable", "0", "OverwriteColumnFamily", getColName(2), null);
+ }
+
+ @Test
+ public void testAppendWithTimestampSucceeds() throws IOException {
+ // Test that we can create a table and then import multiple rows
+ // validate for append scenario with time stamp
+ String [] argv = getArgv(true, "AppendTable", "AppendColumnFamily", true, null);
+ String [] types = { "INT", "INT", "INT", "DATETIME" };
+ String [] vals = { "0", "1", "1", "'2017-03-20'" };
+ createTableWithColTypes(types, vals);
+ runImport(argv);
+ verifyHBaseCell("AppendTable", "0", "AppendColumnFamily", getColName(2), "1");
+ // Run a second time.
+ argv = getIncrementalArgv(true, "AppendTable", "AppendColumnFamily", true, null, true, false, "DATA_COL1", "2017-03-24 01:01:01.0", null);
+ vals = new String[] { "1", "2", "3", "'2017-06-15'" };
+ insertIntoTable(types, vals);
+ runImport(argv);
+ verifyHBaseCell("AppendTable", "1", "AppendColumnFamily", getColName(2), "3");
+ }
+
+ @Test
+ public void testAppendSucceeds() throws IOException {
+ // Test that we can create a table and then import multiple rows
+ // validate for append scenario with ID column(DATA_COL3)
+ String [] argv = getArgv(true, "AppendTable", "AppendColumnFamily", true, null);
+ String [] types = { "INT", "INT", "INT", "DATETIME" };
+ String [] vals = { "0", "1", "1", "'2017-03-20'" };
+ createTableWithColTypes(types, vals);
+ runImport(argv);
+ verifyHBaseCell("AppendTable", "0", "AppendColumnFamily", getColName(2), "1");
+ // Run a second time.
+ argv = getIncrementalArgv(true, "AppendTable", "AppendColumnFamily", true, null, true, true, "DATA_COL1", null, "DATA_COL3");
+ vals = new String[] { "1", "2", "3", "'2017-06-15'" };
+ insertIntoTable(types, vals);
+ runImport(argv);
+ verifyHBaseCell("AppendTable", "1", "AppendColumnFamily", getColName(2), "3");
+ }
+
+ @Test
public void testExitFailure() throws IOException {
String [] types = { "INT", "INT", "INT" };
String [] vals = { "0", "42", "43" };
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba058ac3/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java b/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java
index a054eb6..d9f7495 100644
--- a/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java
+++ b/src/test/com/cloudera/sqoop/hbase/HBaseTestCase.java
@@ -18,43 +18,40 @@
package com.cloudera.sqoop.hbase;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.io.File;
import java.io.IOException;
+import java.lang.reflect.Method;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
import org.apache.hadoop.conf.Configuration;
-
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MiniHBaseCluster;
-import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
-
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.util.Bytes;
-
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.util.StringUtils;
-
import org.junit.After;
import org.junit.Before;
import com.cloudera.sqoop.testutil.CommonArgs;
import com.cloudera.sqoop.testutil.HsqldbTestServer;
import com.cloudera.sqoop.testutil.ImportJobTestCase;
-import java.io.File;
-import java.lang.reflect.Method;
-import java.util.UUID;
-import org.apache.commons.io.FileUtils;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
/**
* Utility methods that facilitate HBase import tests.
@@ -115,7 +112,38 @@ public abstract class HBaseTestCase extends ImportJobTestCase {
if (hbaseCreate) {
args.add("--hbase-create-table");
}
+ return args.toArray(new String[0]);
+ }
+
+ /**
+ * Create the argv to pass to Sqoop as incremental options.
+ * @return the argv as an array of strings.
+ */
+ protected String [] getIncrementalArgv(boolean includeHadoopFlags,
+ String hbaseTable, String hbaseColFam, boolean hbaseCreate,
+ String queryStr, boolean isAppend, boolean appendTimestamp, String checkColumn, String checkValue, String lastModifiedColumn) {
+
+ String[] argsStrArray = getArgv(includeHadoopFlags, hbaseTable, hbaseColFam, hbaseCreate, queryStr);
+ List<String> args = new ArrayList<String>(Arrays.asList(argsStrArray));
+ if (isAppend) {
+ args.add("--incremental");
+ args.add("append");
+ if (!appendTimestamp) {
+ args.add("--check-column");
+ args.add(checkColumn);//"ID");
+ } else {
+ args.add("--check-column");
+ args.add(lastModifiedColumn);//LAST_MODIFIED");
+ }
+ } else {
+ args.add("--incremental");
+ args.add("lastmodified");
+ args.add("--check-column");
+ args.add(checkColumn);
+ args.add("--last-value");
+ args.add(checkValue);
+ }
return args.toArray(new String[0]);
}
// Starts a mini hbase cluster in this process.
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ba058ac3/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java b/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java
index 6310a39..8cbb37e 100644
--- a/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java
+++ b/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java
@@ -383,7 +383,7 @@ public abstract class BaseSqoopTestCase {
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
statement.executeUpdate();
} catch (SQLException sqlException) {
- fail("Could not create table: "
+ fail("Could not insert into table: "
+ StringUtils.stringifyException(sqlException));
} finally {
if (null != statement) {
@@ -413,6 +413,139 @@ public abstract class BaseSqoopTestCase {
}
/**
+ * insert into a table with a set of columns values for a given row.
+ * @param colTypes the types of the columns to make
+ * @param vals the SQL text for each value to insert
+ */
+ protected void insertIntoTable(String[] colTypes, String[] vals) {
+ assert colNames != null;
+ assert colNames.length == vals.length;
+
+ Connection conn = null;
+ PreparedStatement statement = null;
+
+ String[] colNames = new String[vals.length];
+ for( int i = 0; i < vals.length; i++) {
+ colNames[i] = BASE_COL_NAME + Integer.toString(i);
+ }
+ try {
+ conn = getManager().getConnection();
+ for (int count=0; vals != null && count < vals.length/colTypes.length;
+ ++count ) {
+ String columnListStr = "";
+ String valueListStr = "";
+ for (int i = 0; i < colTypes.length; i++) {
+ columnListStr += manager.escapeColName(colNames[i].toUpperCase());
+ valueListStr += vals[count * colTypes.length + i];
+ if (i < colTypes.length - 1) {
+ columnListStr += ", ";
+ valueListStr += ", ";
+ }
+ }
+ try {
+ String insertValsStr = "INSERT INTO " + manager.escapeTableName(getTableName()) + "(" + columnListStr + ")"
+ + " VALUES(" + valueListStr + ")";
+ LOG.info("Inserting values: " + insertValsStr);
+ statement = conn.prepareStatement(
+ insertValsStr,
+ ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ statement.executeUpdate();
+ } catch (SQLException sqlException) {
+ fail("Could not insert into table: "
+ + StringUtils.stringifyException(sqlException));
+ } finally {
+ if (null != statement) {
+ try {
+ statement.close();
+ } catch (SQLException se) {
+ // Ignore exception on close.
+ }
+
+ statement = null;
+ }
+ }
+ }
+ conn.commit();
+ this.colNames = colNames;
+ } catch (SQLException se) {
+ if (null != conn) {
+ try {
+ conn.close();
+ } catch (SQLException connSE) {
+ // Ignore exception on close.
+ }
+ }
+ fail("Could not create table: " + StringUtils.stringifyException(se));
+ }
+
+ }
+
+ /**
+ * update a table with a set of columns values for a given row.
+ * @param colTypes the types of the columns to make
+ * @param vals the SQL text for each value to insert
+ */
+ protected void updateTable(String[] colTypes, String[] vals) {
+ assert colNames != null;
+ assert colNames.length == vals.length;
+
+ Connection conn = null;
+ PreparedStatement statement = null;
+
+ String[] colNames = new String[vals.length];
+ for( int i = 0; i < vals.length; i++) {
+ colNames[i] = BASE_COL_NAME + Integer.toString(i);
+ }
+
+ try {
+ conn = getManager().getConnection();
+ for (int count=0; vals != null && count < vals.length/colNames.length;
+ ++count ) {
+ String updateStr = "";
+ for (int i = 1; i < colNames.length; i++) {
+ updateStr += manager.escapeColName(colNames[i].toUpperCase()) + " = "+vals[count * colNames.length + i];
+ if (i < colNames.length - 1) {
+ updateStr += ", ";
+ }
+ }
+ updateStr += " WHERE "+colNames[0]+"="+vals[0]+"";
+ try {
+ String updateValsStr = "UPDATE " + manager.escapeTableName(getTableName()) + " SET " + updateStr;
+ LOG.info("updating values: " + updateValsStr);
+ statement = conn.prepareStatement(
+ updateValsStr,
+ ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
+ statement.executeUpdate();
+ } catch (SQLException sqlException) {
+ fail("Could not update table: "
+ + StringUtils.stringifyException(sqlException));
+ } finally {
+ if (null != statement) {
+ try {
+ statement.close();
+ } catch (SQLException se) {
+ // Ignore exception on close.
+ }
+ statement = null;
+ }
+ }
+ }
+
+ conn.commit();
+ this.colNames = colNames;
+ } catch (SQLException se) {
+ if (null != conn) {
+ try {
+ conn.close();
+ } catch (SQLException connSE) {
+ // Ignore exception on close.
+ }
+ }
+ fail("Could not update table: " + StringUtils.stringifyException(se));
+ }
+ }
+
+ /**
* Create a table with a set of columns and add a row of values.
* @param colTypes the types of the columns to make
* @param vals the SQL text for each value to insert