You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ss...@apache.org on 2016/06/07 06:43:37 UTC

[2/4] phoenix git commit: PHOENIX-2967 CSV BulkLoad should properly handle empty family for logical tables.

PHOENIX-2967 CSV BulkLoad should properly handle empty family for logical tables.


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6a45977f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6a45977f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6a45977f

Branch: refs/heads/4.x-HBase-1.1
Commit: 6a45977fbb04a17ddadad432d0c511104ce9dd91
Parents: f12c196
Author: Sergey Soldatov <ss...@apache.org>
Authored: Mon Jun 6 14:30:04 2016 -0700
Committer: Sergey Soldatov <ss...@apache.org>
Committed: Mon Jun 6 23:39:34 2016 -0700

----------------------------------------------------------------------
 .../mapreduce/FormatToBytesWritableMapper.java  | 15 +++++++---
 .../mapreduce/FormatToKeyValueReducer.java      | 30 +++++++-------------
 2 files changed, 22 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a45977f/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
index eb0e3ed..a736fc4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
@@ -44,11 +44,13 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
 import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.UpsertExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -216,12 +218,17 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
                 if (c.getFamilyName() != null)  // Skip PK column
                     family = c.getFamilyName().getBytes();
                 byte[] name = c.getName().getBytes();
-                byte[] cfn = Bytes.add(family,":".getBytes(), name);
+                byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, name);
                 if (!columnIndexes.containsKey(cfn)) {
                     columnIndexes.put(cfn, new Integer(columnIndex));
                     columnIndex++;
                 }
             }
+            byte[] emptyColumnFamily = SchemaUtil.getEmptyColumnFamily(table);
+            byte[] cfn = Bytes.add(emptyColumnFamily, QueryConstants.NAMESPACE_SEPARATOR_BYTES,
+                    QueryConstants.EMPTY_COLUMN_BYTES);
+            columnIndexes.put(cfn, new Integer(columnIndex));
+            columnIndex++;
         }
     }
 
@@ -232,16 +239,16 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
      * @param cell       KeyValue for the column
      * @return column index for the specified cell or -1 if was not found
      */
-    private int findIndex(Cell cell) {
+    private int findIndex(Cell cell) throws IOException {
         byte[] familyName = Bytes.copy(cell.getFamilyArray(), cell.getFamilyOffset(),
                 cell.getFamilyLength());
         byte[] name = Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(),
                 cell.getQualifierLength());
-        byte[] cfn = Bytes.add(familyName, ":".getBytes(), name);
+        byte[] cfn = Bytes.add(familyName, QueryConstants.NAMESPACE_SEPARATOR_BYTES, name);
         if(columnIndexes.containsKey(cfn)) {
             return columnIndexes.get(cfn);
         }
-        return -1;
+        throw new IOException("Unable to map cell to column index");
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a45977f/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
index aa807c4..15d6d2f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
@@ -65,7 +64,6 @@ public class FormatToKeyValueReducer
     protected List<String> logicalNames;
     protected KeyValueBuilder builder;
     private Map<Integer, Pair<byte[], byte[]>> columnIndexes;
-    private Map<String, ImmutableBytesPtr> emptyFamilyName;
 
 
     @Override
@@ -91,13 +89,11 @@ public class FormatToKeyValueReducer
     }
 
     private void initColumnsMap(PhoenixConnection conn) throws SQLException {
-        Map <byte[], Integer> indexMap = new TreeMap(Bytes.BYTES_COMPARATOR);
-        emptyFamilyName = new HashMap<>();
+        Map<byte[], Integer> indexMap = new TreeMap(Bytes.BYTES_COMPARATOR);
         columnIndexes = new HashMap<>();
         int columnIndex = 0;
-        for(int index = 0; index < logicalNames.size(); index++) {
+        for (int index = 0; index < logicalNames.size(); index++) {
             PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(index));
-            emptyFamilyName.put(tableNames.get(index), SchemaUtil.getEmptyColumnFamilyPtr(table));
             List<PColumn> cls = table.getColumns();
             for (int i = 0; i < cls.size(); i++) {
                 PColumn c = cls.get(i);
@@ -106,7 +102,7 @@ public class FormatToKeyValueReducer
                     family = c.getFamilyName().getBytes();
                 }
                 byte[] name = c.getName().getBytes();
-                byte[] cfn = Bytes.add(family,":".getBytes(), name);
+                byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, name);
                 Pair<byte[], byte[]> pair = new Pair(family, name);
                 if (!indexMap.containsKey(cfn)) {
                     indexMap.put(cfn, new Integer(columnIndex));
@@ -114,6 +110,11 @@ public class FormatToKeyValueReducer
                     columnIndex++;
                 }
             }
+            byte[] emptyColumnFamily = SchemaUtil.getEmptyColumnFamily(table);
+            Pair<byte[], byte[]> pair = new Pair(emptyColumnFamily, QueryConstants
+                    .EMPTY_COLUMN_BYTES);
+            columnIndexes.put(new Integer(columnIndex), pair);
+            columnIndex++;
         }
     }
 
@@ -131,18 +132,9 @@ public class FormatToKeyValueReducer
                 ImmutableBytesWritable family;
                 ImmutableBytesWritable name;
                 ImmutableBytesWritable value = QueryConstants.EMPTY_COLUMN_VALUE_BYTES_PTR;
-                if (index == -1) {
-                    family = emptyFamilyName.get(key.getTableName());
-                    name = QueryConstants.EMPTY_COLUMN_BYTES_PTR;
-                } else {
-                    Pair<byte[], byte[]> pair = columnIndexes.get(index);
-                    if(pair.getFirst() != null) {
-                        family = new ImmutableBytesWritable(pair.getFirst());
-                    } else {
-                        family = emptyFamilyName.get(key.getTableName());
-                    }
-                    name = new ImmutableBytesWritable(pair.getSecond());
-                }
+                Pair<byte[], byte[]> pair = columnIndexes.get(index);
+                family = new ImmutableBytesWritable(pair.getFirst());
+                name = new ImmutableBytesWritable(pair.getSecond());
                 int len = WritableUtils.readVInt(input);
                 if (len > 0) {
                     byte[] array = new byte[len];