You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2013/06/21 18:08:11 UTC

git commit: SQOOP-1038: Add support for composite keys in HBase import

Updated Branches:
  refs/heads/trunk 217e2af3f -> a2a02076a


SQOOP-1038: Add support for composite keys in HBase import

(Shruti Joshi via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/a2a02076
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/a2a02076
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/a2a02076

Branch: refs/heads/trunk
Commit: a2a02076a9c94f344332864471894fa866c11806
Parents: 217e2af
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Fri Jun 21 09:07:24 2013 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Fri Jun 21 09:07:24 2013 -0700

----------------------------------------------------------------------
 src/docs/man/hbase-args.txt                     |   3 +
 src/docs/user/hbase-args.txt                    |   4 +
 src/docs/user/hbase.txt                         |   7 ++
 .../apache/sqoop/hbase/HBasePutProcessor.java   |   1 +
 .../sqoop/hbase/ToStringPutTransformer.java     | 117 ++++++++++++++++---
 .../sqoop/hbase/HBaseImportAddRowKeyTest.java   |  22 ++++
 6 files changed, 139 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/a2a02076/src/docs/man/hbase-args.txt
----------------------------------------------------------------------
diff --git a/src/docs/man/hbase-args.txt b/src/docs/man/hbase-args.txt
index 7164e93..456bc14 100644
--- a/src/docs/man/hbase-args.txt
+++ b/src/docs/man/hbase-args.txt
@@ -29,6 +29,9 @@ HBase options
 
 --hbase-row-key (col)::
   Specifies which input column to use as the row key
+  If input table contains composite key, value of
+  (col) must be a comma-separated list of composite
+  key attributes
 
 --hbase-table (table-name)::
   Specifies an HBase table to use as the target instead of HDFS

http://git-wip-us.apache.org/repos/asf/sqoop/blob/a2a02076/src/docs/user/hbase-args.txt
----------------------------------------------------------------------
diff --git a/src/docs/user/hbase-args.txt b/src/docs/user/hbase-args.txt
index 36b930b..8ba23eb 100644
--- a/src/docs/user/hbase-args.txt
+++ b/src/docs/user/hbase-args.txt
@@ -27,6 +27,10 @@ Argument                      Description
 +\--hbase-create-table+       If specified, create missing HBase tables
 +\--hbase-row-key <col>+      Specifies which input column to use as the\
                               row key
+                              In case, if input table contains composite
+                              key, then <col> must be in the form of a
+                              comma-separated list of composite key
+                              attributes
 +\--hbase-table <table-name>+ Specifies an HBase table to use as the \
                               target instead of HDFS
 --------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/sqoop/blob/a2a02076/src/docs/user/hbase.txt
----------------------------------------------------------------------
diff --git a/src/docs/user/hbase.txt b/src/docs/user/hbase.txt
index 24c8df8..34f9875 100644
--- a/src/docs/user/hbase.txt
+++ b/src/docs/user/hbase.txt
@@ -39,6 +39,13 @@ with +\--column-family+.
 NOTE: This function is incompatible with direct import (parameter 
 +\--direct+).
 
+If the input table has composite key, the +\--hbase-row-key+ must be
+in the form of a comma-separated list of composite key attributes.
+In this case, the row key for HBase row will be generated by combining
+values of composite key attributes using underscore as a separator.
+NOTE: Sqoop import for a table with composite key will work only if
+parameter +\--hbase-row-key+ has been specified.
+
 If the target table and column family do not exist, the Sqoop job will
 exit with an error. You should create the target table and column family
 before running an import. If you specify +\--hbase-create-table+, Sqoop

http://git-wip-us.apache.org/repos/asf/sqoop/blob/a2a02076/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
index 6aca97f..9ceb5bd 100644
--- a/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
+++ b/src/java/org/apache/sqoop/hbase/HBasePutProcessor.java
@@ -112,6 +112,7 @@ public class HBasePutProcessor implements Closeable, Configurable,
       stringPutTransformer.addRowKey =
           conf.getBoolean(HBasePutProcessor.ADD_ROW_KEY,
               HBasePutProcessor.ADD_ROW_KEY_DEFAULT);
+      stringPutTransformer.detectCompositeKey();
     }
 
     this.tableName = conf.get(TABLE_NAME_KEY, null);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/a2a02076/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java b/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java
index 13c765c..5ccf311 100644
--- a/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java
+++ b/src/java/org/apache/sqoop/hbase/ToStringPutTransformer.java
@@ -20,6 +20,8 @@ package org.apache.sqoop.hbase;
 
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -29,6 +31,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.StringUtils;
 
 import com.cloudera.sqoop.hbase.PutTransformer;
 
@@ -45,13 +48,27 @@ public class ToStringPutTransformer extends PutTransformer {
   private Map<String, byte[]> serializedFieldNames;
   protected boolean bigDecimalFormatString;
   protected boolean addRowKey;
+  private boolean isCompositeKey = false;
+  private List<String> compositeKeyAttributes;
+
+  /**
+   * Used as delimiter to combine composite-key column names when passed as.
+   * argument to --hbase-row-key
+   */
+  public static final String DELIMITER_COMMAND_LINE = ",";
+
+  /**
+   * Used as connecting char for storing composite-key values to form.
+   * composite row-key on hbase
+   */
+  public static final String DELIMITER_HBASE = "_";
 
   public ToStringPutTransformer() {
     serializedFieldNames = new TreeMap<String, byte[]>();
   }
 
   /**
-   * Return the serialized bytes for a field name, using
+   * Return the serialized bytes for a field name, using.
    * the cache if it's already in there.
    */
   private byte [] getFieldNameBytes(String fieldName) {
@@ -67,30 +84,101 @@ public class ToStringPutTransformer extends PutTransformer {
     return nameBytes;
   }
 
+  /**
+   * Checks whether --hbase-row-key parameter is a comma separated list of.
+   * attributes i.e composite key
+   */
+  public void detectCompositeKey() {
+    String rowKeyCol = getRowKeyColumn();
+    if (null != rowKeyCol && rowKeyCol.contains(DELIMITER_COMMAND_LINE)) {
+      // Set the flag as true
+      isCompositeKey = true;
+      String[] compositeKeyArray = rowKeyCol.split(DELIMITER_COMMAND_LINE);
+      compositeKeyAttributes = Arrays.asList(compositeKeyArray);
+    }
+  }
+
   @Override
   /** {@inheritDoc} */
   public List<Put> getPutCommand(Map<String, Object> fields)
       throws IOException {
 
     String rowKeyCol = getRowKeyColumn();
+    if (null == rowKeyCol) {
+      throw new IOException("Row key column can't be NULL.");
+    }
+
     String colFamily = getColumnFamily();
-    byte [] colFamilyBytes = Bytes.toBytes(colFamily);
-
-    Object rowKey = fields.get(rowKeyCol);
-    if (null == rowKey) {
-      // If the row-key column is null, we don't insert this row.
-      LOG.warn("Could not insert row with null value for row-key column: "
-          + rowKeyCol);
-      return null;
+    if (null == colFamily) {
+      throw new IOException("Column family can't be NULL.");
     }
 
-    Put put = new Put(Bytes.toBytes(toHBaseString(rowKey)));
+    if (isCompositeKey) {
+      // Indicates row-key is a composite key (multiple attribute key)
+      List<String> rowKeyList = new ArrayList<String>();
+
+      // storing each comma-separated attribute into list
+      for (String fieldName : compositeKeyAttributes) {
+        Object fieldValue = fields.get(fieldName);
+        if (null == fieldValue) {
+          // If the row-key column value is null, we don't insert this row.
+          throw new IOException("Could not insert row with null "
+            + "value for row-key column: " + fieldName);
+        }
+        String rowKey = toHBaseString(fieldValue);
+        // inserting value of each attribute (rowKey) into list
+        rowKeyList.add(rowKey);
+      }
+
+      // construct rowKey by combining attribute values
+      // from composite key
+      String compositeRowKey = StringUtils.join(DELIMITER_HBASE, rowKeyList);
+      // Insert record in HBase
+      return putRecordInHBase(fields, colFamily, compositeRowKey);
+
+    } else {
+      // if row-key is regular primary key
+      // i.e. it contains only one attribute
+
+      Object rowKey = fields.get(rowKeyCol);
+      if (null == rowKey) {
+        // If the row-key column is null, we don't insert this row.
+        throw new IOException("Could not insert row with null "
+          + "value for row-key column: " + rowKeyCol);
+      }
+
+      String hBaseRowKey = toHBaseString(rowKey);
+      return putRecordInHBase(fields, colFamily, hBaseRowKey);
+   }
+ }
 
-    for (Map.Entry<String, Object> fieldEntry : fields.entrySet()) {
+  /**
+   * Performs actual Put operation for the specified record in HBase.
+   * @param record
+   * @param colFamily
+   * @param rowKey
+   * @return List containing a single put command
+   */
+  private List<Put> putRecordInHBase(Map<String, Object> record,
+    String colFamily, String rowKey) {
+    // Put row-key in HBase
+    Put put = new Put(Bytes.toBytes(rowKey));
+    byte[] colFamilyBytes = Bytes.toBytes(colFamily);
+
+    for (Map.Entry<String, Object> fieldEntry : record.entrySet()) {
       String colName = fieldEntry.getKey();
-      if (!colName.equals(rowKeyCol) || addRowKey) {
-        // This is a regular field, not the row key.
-        // Add it if it's not null.
+      boolean rowKeyCol = false;
+      /*
+       * For both composite key and normal primary key,
+       * check if colName is part of rowKey.
+       */
+      if ((isCompositeKey && compositeKeyAttributes.contains(colName))
+        || colName.equals(getRowKeyColumn())) {
+        rowKeyCol = true;
+      }
+
+      if (!rowKeyCol || addRowKey) {
+        // check addRowKey flag before including rowKey field.
         Object val = fieldEntry.getValue();
         if (null != val) {
           put.add(colFamilyBytes, getFieldNameBytes(colName),
@@ -98,7 +186,6 @@ public class ToStringPutTransformer extends PutTransformer {
         }
       }
     }
-
     return Collections.singletonList(put);
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/a2a02076/src/test/com/cloudera/sqoop/hbase/HBaseImportAddRowKeyTest.java
----------------------------------------------------------------------
diff --git a/src/test/com/cloudera/sqoop/hbase/HBaseImportAddRowKeyTest.java b/src/test/com/cloudera/sqoop/hbase/HBaseImportAddRowKeyTest.java
index bd1cd2d..cfbb1d3 100644
--- a/src/test/com/cloudera/sqoop/hbase/HBaseImportAddRowKeyTest.java
+++ b/src/test/com/cloudera/sqoop/hbase/HBaseImportAddRowKeyTest.java
@@ -60,4 +60,26 @@ public class HBaseImportAddRowKeyTest extends HBaseTestCase {
     verifyHBaseCell("addRowKeyDfT", "0", "addRowKeyDfF", getColName(0), null);
     verifyHBaseCell("addRowKeyDfT", "0", "addRowKeyDfF", getColName(1), "1");
   }
+
+  @Test
+  public void testAddCompositeKey() throws IOException {
+    String[] types = { "INT", "INT" };
+    String[] vals = { "0", "1" };
+    createTableWithColTypes(types, vals);
+
+    String[] otherArg = getArgv(true, "addRowKeyT", "addRowKeyF", true, null);
+    String[] argv = new String[otherArg.length + 4];
+    argv[0]="-D";
+    argv[1]="sqoop.hbase.add.row.key=true";
+    System.arraycopy(otherArg, 0, argv, 2, otherArg.length);
+    argv[argv.length - 2] = "--hbase-row-key";
+    argv[argv.length - 1] = getColName(0)+","+getColName(1);
+
+    runImport(argv);
+
+    // Row key should have been added
+    verifyHBaseCell("addRowKeyT", "0_1", "addRowKeyF", getColName(0), "0");
+    verifyHBaseCell("addRowKeyT", "0_1", "addRowKeyF", getColName(1), "1");
+  }
+
 }