You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ss...@apache.org on 2016/06/07 06:43:37 UTC
[2/4] phoenix git commit: PHOENIX-2967 CSV BulkLoad should properly
handle empty family for logical tables.
PHOENIX-2967 CSV BulkLoad should properly handle empty family for logical tables.
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6a45977f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6a45977f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6a45977f
Branch: refs/heads/4.x-HBase-1.1
Commit: 6a45977fbb04a17ddadad432d0c511104ce9dd91
Parents: f12c196
Author: Sergey Soldatov <ss...@apache.org>
Authored: Mon Jun 6 14:30:04 2016 -0700
Committer: Sergey Soldatov <ss...@apache.org>
Committed: Mon Jun 6 23:39:34 2016 -0700
----------------------------------------------------------------------
.../mapreduce/FormatToBytesWritableMapper.java | 15 +++++++---
.../mapreduce/FormatToKeyValueReducer.java | 30 +++++++-------------
2 files changed, 22 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a45977f/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
index eb0e3ed..a736fc4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
@@ -44,11 +44,13 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.ColumnInfo;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.UpsertExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -216,12 +218,17 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
if (c.getFamilyName() != null) // Skip PK column
family = c.getFamilyName().getBytes();
byte[] name = c.getName().getBytes();
- byte[] cfn = Bytes.add(family,":".getBytes(), name);
+ byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, name);
if (!columnIndexes.containsKey(cfn)) {
columnIndexes.put(cfn, new Integer(columnIndex));
columnIndex++;
}
}
+ byte[] emptyColumnFamily = SchemaUtil.getEmptyColumnFamily(table);
+ byte[] cfn = Bytes.add(emptyColumnFamily, QueryConstants.NAMESPACE_SEPARATOR_BYTES,
+ QueryConstants.EMPTY_COLUMN_BYTES);
+ columnIndexes.put(cfn, new Integer(columnIndex));
+ columnIndex++;
}
}
@@ -232,16 +239,16 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
* @param cell KeyValue for the column
* @return column index for the specified cell or -1 if was not found
*/
- private int findIndex(Cell cell) {
+ private int findIndex(Cell cell) throws IOException {
byte[] familyName = Bytes.copy(cell.getFamilyArray(), cell.getFamilyOffset(),
cell.getFamilyLength());
byte[] name = Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(),
cell.getQualifierLength());
- byte[] cfn = Bytes.add(familyName, ":".getBytes(), name);
+ byte[] cfn = Bytes.add(familyName, QueryConstants.NAMESPACE_SEPARATOR_BYTES, name);
if(columnIndexes.containsKey(cfn)) {
return columnIndexes.get(cfn);
}
- return -1;
+ throw new IOException("Unable to map cell to column index");
}
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6a45977f/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
index aa807c4..15d6d2f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
@@ -65,7 +64,6 @@ public class FormatToKeyValueReducer
protected List<String> logicalNames;
protected KeyValueBuilder builder;
private Map<Integer, Pair<byte[], byte[]>> columnIndexes;
- private Map<String, ImmutableBytesPtr> emptyFamilyName;
@Override
@@ -91,13 +89,11 @@ public class FormatToKeyValueReducer
}
private void initColumnsMap(PhoenixConnection conn) throws SQLException {
- Map <byte[], Integer> indexMap = new TreeMap(Bytes.BYTES_COMPARATOR);
- emptyFamilyName = new HashMap<>();
+ Map<byte[], Integer> indexMap = new TreeMap(Bytes.BYTES_COMPARATOR);
columnIndexes = new HashMap<>();
int columnIndex = 0;
- for(int index = 0; index < logicalNames.size(); index++) {
+ for (int index = 0; index < logicalNames.size(); index++) {
PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(index));
- emptyFamilyName.put(tableNames.get(index), SchemaUtil.getEmptyColumnFamilyPtr(table));
List<PColumn> cls = table.getColumns();
for (int i = 0; i < cls.size(); i++) {
PColumn c = cls.get(i);
@@ -106,7 +102,7 @@ public class FormatToKeyValueReducer
family = c.getFamilyName().getBytes();
}
byte[] name = c.getName().getBytes();
- byte[] cfn = Bytes.add(family,":".getBytes(), name);
+ byte[] cfn = Bytes.add(family, QueryConstants.NAMESPACE_SEPARATOR_BYTES, name);
Pair<byte[], byte[]> pair = new Pair(family, name);
if (!indexMap.containsKey(cfn)) {
indexMap.put(cfn, new Integer(columnIndex));
@@ -114,6 +110,11 @@ public class FormatToKeyValueReducer
columnIndex++;
}
}
+ byte[] emptyColumnFamily = SchemaUtil.getEmptyColumnFamily(table);
+ Pair<byte[], byte[]> pair = new Pair(emptyColumnFamily, QueryConstants
+ .EMPTY_COLUMN_BYTES);
+ columnIndexes.put(new Integer(columnIndex), pair);
+ columnIndex++;
}
}
@@ -131,18 +132,9 @@ public class FormatToKeyValueReducer
ImmutableBytesWritable family;
ImmutableBytesWritable name;
ImmutableBytesWritable value = QueryConstants.EMPTY_COLUMN_VALUE_BYTES_PTR;
- if (index == -1) {
- family = emptyFamilyName.get(key.getTableName());
- name = QueryConstants.EMPTY_COLUMN_BYTES_PTR;
- } else {
- Pair<byte[], byte[]> pair = columnIndexes.get(index);
- if(pair.getFirst() != null) {
- family = new ImmutableBytesWritable(pair.getFirst());
- } else {
- family = emptyFamilyName.get(key.getTableName());
- }
- name = new ImmutableBytesWritable(pair.getSecond());
- }
+ Pair<byte[], byte[]> pair = columnIndexes.get(index);
+ family = new ImmutableBytesWritable(pair.getFirst());
+ name = new ImmutableBytesWritable(pair.getSecond());
int len = WritableUtils.readVInt(input);
if (len > 0) {
byte[] array = new byte[len];