You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ss...@apache.org on 2016/05/23 20:10:20 UTC
[1/4] phoenix git commit: PHOENIX-2925 CsvBulkloadTool not working
properly if there are multiple local indexes to the same table(After
PHOENIX-1973)
Repository: phoenix
Updated Branches:
refs/heads/4.x-HBase-0.98 09747fc42 -> 9569cb521
refs/heads/4.x-HBase-1.0 4d07528a7 -> ec44a57cc
refs/heads/4.x-HBase-1.1 e36c9c62e -> 05dc0e320
refs/heads/master 789e66560 -> 18da4a046
PHOENIX-2925 CsvBulkloadTool not working properly if there are multiple local indexes to the same table(After PHOENIX-1973)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/18da4a04
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/18da4a04
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/18da4a04
Branch: refs/heads/master
Commit: 18da4a0460967ce28fcdeaad4a1b90440ff36f00
Parents: 789e665
Author: Sergey Soldatov <se...@gmail.com>
Authored: Mon May 23 02:22:45 2016 -0700
Committer: Sergey Soldatov <se...@gmail.com>
Committed: Mon May 23 10:38:09 2016 -0700
----------------------------------------------------------------------
.../phoenix/end2end/CsvBulkLoadToolIT.java | 4 +
.../mapreduce/FormatToBytesWritableMapper.java | 111 +++++++++----------
.../mapreduce/FormatToKeyValueReducer.java | 75 +++++++------
3 files changed, 100 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/18da4a04/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
index 1e9c1d9..8968555 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
@@ -277,6 +277,10 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterHBaseManagedTimeIT {
assertEquals(2, rs.getInt(1));
assertEquals("FirstName 2", rs.getString(2));
+ rs = stmt.executeQuery("SELECT LAST_NAME FROM TABLE6 where last_name='LastName 1'");
+ assertTrue(rs.next());
+ assertEquals("LastName 1", rs.getString(1));
+
rs.close();
stmt.close();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/18da4a04/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
index ff21d4f..eb0e3ed 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
@@ -44,7 +44,6 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
-import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.ColumnInfo;
@@ -108,7 +107,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
/*
lookup table for column index. Index in the List matches to the index in tableNames List
*/
- protected List<Map<byte[], Map<byte[], Integer>>> columnIndexes;
+ protected Map<byte[], Integer> columnIndexes;
protected abstract UpsertExecutor<RECORD,?> buildUpsertExecutor(Configuration conf);
protected abstract LineParser<RECORD> getLineParser();
@@ -135,7 +134,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
logicalNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf);
- columnIndexes = initColumnIndexes();
+ initColumnIndexes();
} catch (SQLException | ClassNotFoundException e) {
throw new RuntimeException(e);
}
@@ -193,7 +192,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
int tableIndex = rowEntry.getKey();
List<KeyValue> lkv = rowEntry.getValue();
// All KV values combines to a single byte array
- writeAggregatedRow(context, tableIndex, lkv);
+ writeAggregatedRow(context, tableNames.get(tableIndex), lkv);
}
conn.rollback();
} catch (Exception e) {
@@ -201,99 +200,99 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
}
}
- private List<Map<byte[], Map<byte[], Integer>>> initColumnIndexes() throws SQLException {
- List<Map<byte[], Map<byte[], Integer>>> tableMap = new ArrayList<>();
- int tableIndex;
- for (tableIndex = 0; tableIndex < tableNames.size(); tableIndex++) {
- PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(tableIndex));
- Map<byte[], Map<byte[], Integer>> columnMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ /*
+ Map all unique pairs <family, name> to index. Table name is part of TableRowkey, so we do
+ not care about it
+ */
+ private void initColumnIndexes() throws SQLException {
+ columnIndexes = new TreeMap(Bytes.BYTES_COMPARATOR);
+ int columnIndex = 0;
+ for(int index = 0; index < logicalNames.size(); index++) {
+ PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(index));
List<PColumn> cls = table.getColumns();
for (int i = 0; i < cls.size(); i++) {
PColumn c = cls.get(i);
- if (c.getFamilyName() == null) continue; // Skip PK column
- byte[] family = c.getFamilyName().getBytes();
+ byte[] family = new byte[0];
+ if (c.getFamilyName() != null) // Skip PK column
+ family = c.getFamilyName().getBytes();
byte[] name = c.getName().getBytes();
- if (!columnMap.containsKey(family)) {
- columnMap.put(family, new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR));
+ byte[] cfn = Bytes.add(family,":".getBytes(), name);
+ if (!columnIndexes.containsKey(cfn)) {
+ columnIndexes.put(cfn, new Integer(columnIndex));
+ columnIndex++;
}
- Map<byte[], Integer> qualifier = columnMap.get(family);
- qualifier.put(name, i);
}
- tableMap.add(columnMap);
}
- return tableMap;
}
/**
* Find the column index which will replace the column name in
* the aggregated array and will be restored in Reducer
*
- * @param tableIndex Table index in tableNames list
* @param cell KeyValue for the column
* @return column index for the specified cell or -1 if was not found
*/
- private int findIndex(int tableIndex, Cell cell) {
- Map<byte[], Map<byte[], Integer>> columnMap = columnIndexes.get(tableIndex);
- Map<byte[], Integer> qualifiers = columnMap.get(Bytes.copy(cell.getFamilyArray(),
- cell.getFamilyOffset(), cell.getFamilyLength()));
- if (qualifiers != null) {
- Integer result = qualifiers.get(Bytes.copy(cell.getQualifierArray(),
- cell.getQualifierOffset(), cell.getQualifierLength()));
- if (result != null) {
- return result;
- }
+ private int findIndex(Cell cell) {
+ byte[] familyName = Bytes.copy(cell.getFamilyArray(), cell.getFamilyOffset(),
+ cell.getFamilyLength());
+ byte[] name = Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(),
+ cell.getQualifierLength());
+ byte[] cfn = Bytes.add(familyName, ":".getBytes(), name);
+ if(columnIndexes.containsKey(cfn)) {
+ return columnIndexes.get(cfn);
}
return -1;
}
/**
- * Collect all column values for the same rowKey
+ * Collect all column values for the same Row. RowKey may be different if indexes are involved,
+ * so it writes a separate record for each unique RowKey
*
* @param context Current mapper context
- * @param tableIndex Table index in tableNames list
+ * @param tableName Table index in tableNames list
* @param lkv List of KV values that will be combined in a single ImmutableBytesWritable
* @throws IOException
* @throws InterruptedException
*/
- private void writeAggregatedRow(Context context, int tableIndex, List<KeyValue> lkv)
+ private void writeAggregatedRow(Context context, String tableName, List<KeyValue> lkv)
throws IOException, InterruptedException {
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
DataOutputStream outputStream = new DataOutputStream(bos);
- ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
+ ImmutableBytesWritable outputKey =null;
if (!lkv.isEmpty()) {
- // All Key Values for the same row are supposed to be the same, so init rowKey only once
- Cell first = lkv.get(0);
- outputKey.set(first.getRowArray(), first.getRowOffset(), first.getRowLength());
for (KeyValue cell : lkv) {
- if (isEmptyCell(cell)) {
- continue;
- }
- int i = findIndex(tableIndex, cell);
- if (i == -1) {
- throw new IOException("No column found for KeyValue");
+ if (outputKey == null || Bytes.compareTo(outputKey.get(), outputKey.getOffset(),
+ outputKey.getLength(), cell.getRowArray(), cell.getRowOffset(), cell
+ .getRowLength()) != 0) {
+ // This a the first RowKey or a different from previous
+ if (outputKey != null) { //It's a different RowKey, so we need to write it
+ ImmutableBytesWritable aggregatedArray =
+ new ImmutableBytesWritable(bos.toByteArray());
+ outputStream.close();
+ context.write(new TableRowkeyPair(tableName, outputKey), aggregatedArray);
+ }
+ outputKey = new ImmutableBytesWritable(cell.getRowArray(), cell.getRowOffset()
+ , cell.getRowLength());
+ bos = new ByteArrayOutputStream(1024);
+ outputStream = new DataOutputStream(bos);
}
- WritableUtils.writeVInt(outputStream, i);
+ /*
+ The order of aggregation: type, index of column, length of value, value itself
+ */
outputStream.writeByte(cell.getTypeByte());
+ int i = findIndex(cell);
+ WritableUtils.writeVInt(outputStream, i);
WritableUtils.writeVInt(outputStream, cell.getValueLength());
outputStream.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+
}
+ ImmutableBytesWritable aggregatedArray = new ImmutableBytesWritable(bos.toByteArray());
+ outputStream.close();
+ context.write(new TableRowkeyPair(tableName, outputKey), aggregatedArray);
}
- ImmutableBytesWritable aggregatedArray = new ImmutableBytesWritable(bos.toByteArray());
- outputStream.close();
- context.write(new TableRowkeyPair(tableNames.get(tableIndex), outputKey), aggregatedArray);
}
- protected boolean isEmptyCell(KeyValue cell) {
- if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(),
- cell.getQualifierLength(), QueryConstants.EMPTY_COLUMN_BYTES, 0,
- QueryConstants.EMPTY_COLUMN_BYTES.length) != 0)
- return false;
- else
- return true;
- }
-
-
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
try {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/18da4a04/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
index 799b3dc..aa807c4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
@@ -21,16 +21,18 @@ import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.sql.SQLException;
-import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.Reducer;
@@ -42,7 +44,6 @@ import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.Closeables;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
@@ -63,8 +64,8 @@ public class FormatToKeyValueReducer
protected List<String> tableNames;
protected List<String> logicalNames;
protected KeyValueBuilder builder;
- List<List<Pair<byte[], byte[]>>> columnIndexes;
- List<ImmutableBytesPtr> emptyFamilyName;
+ private Map<Integer, Pair<byte[], byte[]>> columnIndexes;
+ private Map<String, ImmutableBytesPtr> emptyFamilyName;
@Override
@@ -76,7 +77,6 @@ public class FormatToKeyValueReducer
for (Map.Entry<String, String> entry : conf) {
clientInfos.setProperty(entry.getKey(), entry.getValue());
}
-
try {
PhoenixConnection conn = (PhoenixConnection) QueryUtil.getConnectionOnServer(clientInfos, conf);
builder = conn.getKeyValueBuilder();
@@ -84,9 +84,6 @@ public class FormatToKeyValueReducer
final String logicalNamesConf = conf.get(FormatToBytesWritableMapper.LOGICAL_NAMES_CONFKEY);
tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
logicalNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf);
-
- columnIndexes = new ArrayList<>(tableNames.size());
- emptyFamilyName = new ArrayList<>();
initColumnsMap(conn);
} catch (SQLException | ClassNotFoundException e) {
throw new RuntimeException(e);
@@ -94,24 +91,30 @@ public class FormatToKeyValueReducer
}
private void initColumnsMap(PhoenixConnection conn) throws SQLException {
- for (String tableName : logicalNames) {
- PTable table = PhoenixRuntime.getTable(conn, tableName);
- emptyFamilyName.add(SchemaUtil.getEmptyColumnFamilyPtr(table));
+ Map <byte[], Integer> indexMap = new TreeMap(Bytes.BYTES_COMPARATOR);
+ emptyFamilyName = new HashMap<>();
+ columnIndexes = new HashMap<>();
+ int columnIndex = 0;
+ for(int index = 0; index < logicalNames.size(); index++) {
+ PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(index));
+ emptyFamilyName.put(tableNames.get(index), SchemaUtil.getEmptyColumnFamilyPtr(table));
List<PColumn> cls = table.getColumns();
- List<Pair<byte[], byte[]>> list = new ArrayList(cls.size());
for (int i = 0; i < cls.size(); i++) {
PColumn c = cls.get(i);
- if (c.getFamilyName() == null) {
- list.add(null); // Skip PK column
- continue;
+ byte[] family = new byte[0];
+ if (c.getFamilyName() != null) {
+ family = c.getFamilyName().getBytes();
}
- byte[] family = c.getFamilyName().getBytes();
byte[] name = c.getName().getBytes();
- list.add(new Pair(family, name));
+ byte[] cfn = Bytes.add(family,":".getBytes(), name);
+ Pair<byte[], byte[]> pair = new Pair(family, name);
+ if (!indexMap.containsKey(cfn)) {
+ indexMap.put(cfn, new Integer(columnIndex));
+ columnIndexes.put(new Integer(columnIndex), pair);
+ columnIndex++;
+ }
}
- columnIndexes.add(list);
}
-
}
@Override
@@ -119,15 +122,27 @@ public class FormatToKeyValueReducer
Reducer<TableRowkeyPair, ImmutableBytesWritable, TableRowkeyPair, KeyValue>.Context context)
throws IOException, InterruptedException {
TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
- int tableIndex = tableNames.indexOf(key.getTableName());
- List<Pair<byte[], byte[]>> columns = columnIndexes.get(tableIndex);
+ ImmutableBytesWritable rowKey = key.getRowkey();
for (ImmutableBytesWritable aggregatedArray : values) {
DataInputStream input = new DataInputStream(new ByteArrayInputStream(aggregatedArray.get()));
while (input.available() != 0) {
- int index = WritableUtils.readVInt(input);
- Pair<byte[], byte[]> pair = columns.get(index);
byte type = input.readByte();
- ImmutableBytesWritable value = null;
+ int index = WritableUtils.readVInt(input);
+ ImmutableBytesWritable family;
+ ImmutableBytesWritable name;
+ ImmutableBytesWritable value = QueryConstants.EMPTY_COLUMN_VALUE_BYTES_PTR;
+ if (index == -1) {
+ family = emptyFamilyName.get(key.getTableName());
+ name = QueryConstants.EMPTY_COLUMN_BYTES_PTR;
+ } else {
+ Pair<byte[], byte[]> pair = columnIndexes.get(index);
+ if(pair.getFirst() != null) {
+ family = new ImmutableBytesWritable(pair.getFirst());
+ } else {
+ family = emptyFamilyName.get(key.getTableName());
+ }
+ name = new ImmutableBytesWritable(pair.getSecond());
+ }
int len = WritableUtils.readVInt(input);
if (len > 0) {
byte[] array = new byte[len];
@@ -138,24 +153,16 @@ public class FormatToKeyValueReducer
KeyValue.Type kvType = KeyValue.Type.codeToType(type);
switch (kvType) {
case Put: // not null value
- kv = builder.buildPut(key.getRowkey(),
- new ImmutableBytesWritable(pair.getFirst()),
- new ImmutableBytesWritable(pair.getSecond()), value);
+ kv = builder.buildPut(key.getRowkey(), family, name, value);
break;
case DeleteColumn: // null value
- kv = builder.buildDeleteColumns(key.getRowkey(),
- new ImmutableBytesWritable(pair.getFirst()),
- new ImmutableBytesWritable(pair.getSecond()));
+ kv = builder.buildDeleteColumns(key.getRowkey(), family, name);
break;
default:
throw new IOException("Unsupported KeyValue type " + kvType);
}
map.add(kv);
}
- KeyValue empty = builder.buildPut(key.getRowkey(),
- emptyFamilyName.get(tableIndex),
- QueryConstants.EMPTY_COLUMN_BYTES_PTR, ByteUtil.EMPTY_BYTE_ARRAY_PTR);
- map.add(empty);
Closeables.closeQuietly(input);
}
context.setStatus("Read " + map.getClass());
[4/4] phoenix git commit: PHOENIX-2925 CsvBulkloadTool not working
properly if there are multiple local indexes to the same table(After
PHOENIX-1973)
Posted by ss...@apache.org.
PHOENIX-2925 CsvBulkloadTool not working properly if there are multiple local indexes to the same table(After PHOENIX-1973)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/05dc0e32
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/05dc0e32
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/05dc0e32
Branch: refs/heads/4.x-HBase-1.1
Commit: 05dc0e32091ac356c71399d81d2cabd394980e0c
Parents: e36c9c6
Author: Sergey Soldatov <se...@gmail.com>
Authored: Mon May 23 02:22:45 2016 -0700
Committer: Sergey Soldatov <se...@gmail.com>
Committed: Mon May 23 13:05:36 2016 -0700
----------------------------------------------------------------------
.../phoenix/end2end/CsvBulkLoadToolIT.java | 4 +
.../mapreduce/FormatToBytesWritableMapper.java | 111 +++++++++----------
.../mapreduce/FormatToKeyValueReducer.java | 75 +++++++------
3 files changed, 100 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/05dc0e32/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
index 1e9c1d9..8968555 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
@@ -277,6 +277,10 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterHBaseManagedTimeIT {
assertEquals(2, rs.getInt(1));
assertEquals("FirstName 2", rs.getString(2));
+ rs = stmt.executeQuery("SELECT LAST_NAME FROM TABLE6 where last_name='LastName 1'");
+ assertTrue(rs.next());
+ assertEquals("LastName 1", rs.getString(1));
+
rs.close();
stmt.close();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/05dc0e32/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
index ff21d4f..eb0e3ed 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
@@ -44,7 +44,6 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
-import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.ColumnInfo;
@@ -108,7 +107,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
/*
lookup table for column index. Index in the List matches to the index in tableNames List
*/
- protected List<Map<byte[], Map<byte[], Integer>>> columnIndexes;
+ protected Map<byte[], Integer> columnIndexes;
protected abstract UpsertExecutor<RECORD,?> buildUpsertExecutor(Configuration conf);
protected abstract LineParser<RECORD> getLineParser();
@@ -135,7 +134,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
logicalNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf);
- columnIndexes = initColumnIndexes();
+ initColumnIndexes();
} catch (SQLException | ClassNotFoundException e) {
throw new RuntimeException(e);
}
@@ -193,7 +192,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
int tableIndex = rowEntry.getKey();
List<KeyValue> lkv = rowEntry.getValue();
// All KV values combines to a single byte array
- writeAggregatedRow(context, tableIndex, lkv);
+ writeAggregatedRow(context, tableNames.get(tableIndex), lkv);
}
conn.rollback();
} catch (Exception e) {
@@ -201,99 +200,99 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
}
}
- private List<Map<byte[], Map<byte[], Integer>>> initColumnIndexes() throws SQLException {
- List<Map<byte[], Map<byte[], Integer>>> tableMap = new ArrayList<>();
- int tableIndex;
- for (tableIndex = 0; tableIndex < tableNames.size(); tableIndex++) {
- PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(tableIndex));
- Map<byte[], Map<byte[], Integer>> columnMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ /*
+ Map all unique pairs <family, name> to index. Table name is part of TableRowkey, so we do
+ not care about it
+ */
+ private void initColumnIndexes() throws SQLException {
+ columnIndexes = new TreeMap(Bytes.BYTES_COMPARATOR);
+ int columnIndex = 0;
+ for(int index = 0; index < logicalNames.size(); index++) {
+ PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(index));
List<PColumn> cls = table.getColumns();
for (int i = 0; i < cls.size(); i++) {
PColumn c = cls.get(i);
- if (c.getFamilyName() == null) continue; // Skip PK column
- byte[] family = c.getFamilyName().getBytes();
+ byte[] family = new byte[0];
+ if (c.getFamilyName() != null) // Skip PK column
+ family = c.getFamilyName().getBytes();
byte[] name = c.getName().getBytes();
- if (!columnMap.containsKey(family)) {
- columnMap.put(family, new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR));
+ byte[] cfn = Bytes.add(family,":".getBytes(), name);
+ if (!columnIndexes.containsKey(cfn)) {
+ columnIndexes.put(cfn, new Integer(columnIndex));
+ columnIndex++;
}
- Map<byte[], Integer> qualifier = columnMap.get(family);
- qualifier.put(name, i);
}
- tableMap.add(columnMap);
}
- return tableMap;
}
/**
* Find the column index which will replace the column name in
* the aggregated array and will be restored in Reducer
*
- * @param tableIndex Table index in tableNames list
* @param cell KeyValue for the column
* @return column index for the specified cell or -1 if was not found
*/
- private int findIndex(int tableIndex, Cell cell) {
- Map<byte[], Map<byte[], Integer>> columnMap = columnIndexes.get(tableIndex);
- Map<byte[], Integer> qualifiers = columnMap.get(Bytes.copy(cell.getFamilyArray(),
- cell.getFamilyOffset(), cell.getFamilyLength()));
- if (qualifiers != null) {
- Integer result = qualifiers.get(Bytes.copy(cell.getQualifierArray(),
- cell.getQualifierOffset(), cell.getQualifierLength()));
- if (result != null) {
- return result;
- }
+ private int findIndex(Cell cell) {
+ byte[] familyName = Bytes.copy(cell.getFamilyArray(), cell.getFamilyOffset(),
+ cell.getFamilyLength());
+ byte[] name = Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(),
+ cell.getQualifierLength());
+ byte[] cfn = Bytes.add(familyName, ":".getBytes(), name);
+ if(columnIndexes.containsKey(cfn)) {
+ return columnIndexes.get(cfn);
}
return -1;
}
/**
- * Collect all column values for the same rowKey
+ * Collect all column values for the same Row. RowKey may be different if indexes are involved,
+ * so it writes a separate record for each unique RowKey
*
* @param context Current mapper context
- * @param tableIndex Table index in tableNames list
+ * @param tableName Table index in tableNames list
* @param lkv List of KV values that will be combined in a single ImmutableBytesWritable
* @throws IOException
* @throws InterruptedException
*/
- private void writeAggregatedRow(Context context, int tableIndex, List<KeyValue> lkv)
+ private void writeAggregatedRow(Context context, String tableName, List<KeyValue> lkv)
throws IOException, InterruptedException {
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
DataOutputStream outputStream = new DataOutputStream(bos);
- ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
+ ImmutableBytesWritable outputKey =null;
if (!lkv.isEmpty()) {
- // All Key Values for the same row are supposed to be the same, so init rowKey only once
- Cell first = lkv.get(0);
- outputKey.set(first.getRowArray(), first.getRowOffset(), first.getRowLength());
for (KeyValue cell : lkv) {
- if (isEmptyCell(cell)) {
- continue;
- }
- int i = findIndex(tableIndex, cell);
- if (i == -1) {
- throw new IOException("No column found for KeyValue");
+ if (outputKey == null || Bytes.compareTo(outputKey.get(), outputKey.getOffset(),
+ outputKey.getLength(), cell.getRowArray(), cell.getRowOffset(), cell
+ .getRowLength()) != 0) {
+ // This a the first RowKey or a different from previous
+ if (outputKey != null) { //It's a different RowKey, so we need to write it
+ ImmutableBytesWritable aggregatedArray =
+ new ImmutableBytesWritable(bos.toByteArray());
+ outputStream.close();
+ context.write(new TableRowkeyPair(tableName, outputKey), aggregatedArray);
+ }
+ outputKey = new ImmutableBytesWritable(cell.getRowArray(), cell.getRowOffset()
+ , cell.getRowLength());
+ bos = new ByteArrayOutputStream(1024);
+ outputStream = new DataOutputStream(bos);
}
- WritableUtils.writeVInt(outputStream, i);
+ /*
+ The order of aggregation: type, index of column, length of value, value itself
+ */
outputStream.writeByte(cell.getTypeByte());
+ int i = findIndex(cell);
+ WritableUtils.writeVInt(outputStream, i);
WritableUtils.writeVInt(outputStream, cell.getValueLength());
outputStream.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+
}
+ ImmutableBytesWritable aggregatedArray = new ImmutableBytesWritable(bos.toByteArray());
+ outputStream.close();
+ context.write(new TableRowkeyPair(tableName, outputKey), aggregatedArray);
}
- ImmutableBytesWritable aggregatedArray = new ImmutableBytesWritable(bos.toByteArray());
- outputStream.close();
- context.write(new TableRowkeyPair(tableNames.get(tableIndex), outputKey), aggregatedArray);
}
- protected boolean isEmptyCell(KeyValue cell) {
- if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(),
- cell.getQualifierLength(), QueryConstants.EMPTY_COLUMN_BYTES, 0,
- QueryConstants.EMPTY_COLUMN_BYTES.length) != 0)
- return false;
- else
- return true;
- }
-
-
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
try {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/05dc0e32/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
index 799b3dc..aa807c4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
@@ -21,16 +21,18 @@ import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.sql.SQLException;
-import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.Reducer;
@@ -42,7 +44,6 @@ import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.Closeables;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
@@ -63,8 +64,8 @@ public class FormatToKeyValueReducer
protected List<String> tableNames;
protected List<String> logicalNames;
protected KeyValueBuilder builder;
- List<List<Pair<byte[], byte[]>>> columnIndexes;
- List<ImmutableBytesPtr> emptyFamilyName;
+ private Map<Integer, Pair<byte[], byte[]>> columnIndexes;
+ private Map<String, ImmutableBytesPtr> emptyFamilyName;
@Override
@@ -76,7 +77,6 @@ public class FormatToKeyValueReducer
for (Map.Entry<String, String> entry : conf) {
clientInfos.setProperty(entry.getKey(), entry.getValue());
}
-
try {
PhoenixConnection conn = (PhoenixConnection) QueryUtil.getConnectionOnServer(clientInfos, conf);
builder = conn.getKeyValueBuilder();
@@ -84,9 +84,6 @@ public class FormatToKeyValueReducer
final String logicalNamesConf = conf.get(FormatToBytesWritableMapper.LOGICAL_NAMES_CONFKEY);
tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
logicalNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf);
-
- columnIndexes = new ArrayList<>(tableNames.size());
- emptyFamilyName = new ArrayList<>();
initColumnsMap(conn);
} catch (SQLException | ClassNotFoundException e) {
throw new RuntimeException(e);
@@ -94,24 +91,30 @@ public class FormatToKeyValueReducer
}
private void initColumnsMap(PhoenixConnection conn) throws SQLException {
- for (String tableName : logicalNames) {
- PTable table = PhoenixRuntime.getTable(conn, tableName);
- emptyFamilyName.add(SchemaUtil.getEmptyColumnFamilyPtr(table));
+ Map <byte[], Integer> indexMap = new TreeMap(Bytes.BYTES_COMPARATOR);
+ emptyFamilyName = new HashMap<>();
+ columnIndexes = new HashMap<>();
+ int columnIndex = 0;
+ for(int index = 0; index < logicalNames.size(); index++) {
+ PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(index));
+ emptyFamilyName.put(tableNames.get(index), SchemaUtil.getEmptyColumnFamilyPtr(table));
List<PColumn> cls = table.getColumns();
- List<Pair<byte[], byte[]>> list = new ArrayList(cls.size());
for (int i = 0; i < cls.size(); i++) {
PColumn c = cls.get(i);
- if (c.getFamilyName() == null) {
- list.add(null); // Skip PK column
- continue;
+ byte[] family = new byte[0];
+ if (c.getFamilyName() != null) {
+ family = c.getFamilyName().getBytes();
}
- byte[] family = c.getFamilyName().getBytes();
byte[] name = c.getName().getBytes();
- list.add(new Pair(family, name));
+ byte[] cfn = Bytes.add(family,":".getBytes(), name);
+ Pair<byte[], byte[]> pair = new Pair(family, name);
+ if (!indexMap.containsKey(cfn)) {
+ indexMap.put(cfn, new Integer(columnIndex));
+ columnIndexes.put(new Integer(columnIndex), pair);
+ columnIndex++;
+ }
}
- columnIndexes.add(list);
}
-
}
@Override
@@ -119,15 +122,27 @@ public class FormatToKeyValueReducer
Reducer<TableRowkeyPair, ImmutableBytesWritable, TableRowkeyPair, KeyValue>.Context context)
throws IOException, InterruptedException {
TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
- int tableIndex = tableNames.indexOf(key.getTableName());
- List<Pair<byte[], byte[]>> columns = columnIndexes.get(tableIndex);
+ ImmutableBytesWritable rowKey = key.getRowkey();
for (ImmutableBytesWritable aggregatedArray : values) {
DataInputStream input = new DataInputStream(new ByteArrayInputStream(aggregatedArray.get()));
while (input.available() != 0) {
- int index = WritableUtils.readVInt(input);
- Pair<byte[], byte[]> pair = columns.get(index);
byte type = input.readByte();
- ImmutableBytesWritable value = null;
+ int index = WritableUtils.readVInt(input);
+ ImmutableBytesWritable family;
+ ImmutableBytesWritable name;
+ ImmutableBytesWritable value = QueryConstants.EMPTY_COLUMN_VALUE_BYTES_PTR;
+ if (index == -1) {
+ family = emptyFamilyName.get(key.getTableName());
+ name = QueryConstants.EMPTY_COLUMN_BYTES_PTR;
+ } else {
+ Pair<byte[], byte[]> pair = columnIndexes.get(index);
+ if(pair.getFirst() != null) {
+ family = new ImmutableBytesWritable(pair.getFirst());
+ } else {
+ family = emptyFamilyName.get(key.getTableName());
+ }
+ name = new ImmutableBytesWritable(pair.getSecond());
+ }
int len = WritableUtils.readVInt(input);
if (len > 0) {
byte[] array = new byte[len];
@@ -138,24 +153,16 @@ public class FormatToKeyValueReducer
KeyValue.Type kvType = KeyValue.Type.codeToType(type);
switch (kvType) {
case Put: // not null value
- kv = builder.buildPut(key.getRowkey(),
- new ImmutableBytesWritable(pair.getFirst()),
- new ImmutableBytesWritable(pair.getSecond()), value);
+ kv = builder.buildPut(key.getRowkey(), family, name, value);
break;
case DeleteColumn: // null value
- kv = builder.buildDeleteColumns(key.getRowkey(),
- new ImmutableBytesWritable(pair.getFirst()),
- new ImmutableBytesWritable(pair.getSecond()));
+ kv = builder.buildDeleteColumns(key.getRowkey(), family, name);
break;
default:
throw new IOException("Unsupported KeyValue type " + kvType);
}
map.add(kv);
}
- KeyValue empty = builder.buildPut(key.getRowkey(),
- emptyFamilyName.get(tableIndex),
- QueryConstants.EMPTY_COLUMN_BYTES_PTR, ByteUtil.EMPTY_BYTE_ARRAY_PTR);
- map.add(empty);
Closeables.closeQuietly(input);
}
context.setStatus("Read " + map.getClass());
[2/4] phoenix git commit: PHOENIX-2925 CsvBulkloadTool not working
properly if there are multiple local indexes to the same table(After
PHOENIX-1973)
Posted by ss...@apache.org.
PHOENIX-2925 CsvBulkloadTool not working properly if there are multiple local indexes to the same table(After PHOENIX-1973)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/9569cb52
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/9569cb52
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/9569cb52
Branch: refs/heads/4.x-HBase-0.98
Commit: 9569cb5212da949763742b6106314a1a07b0f771
Parents: 09747fc
Author: Sergey Soldatov <se...@gmail.com>
Authored: Mon May 23 02:22:45 2016 -0700
Committer: Sergey Soldatov <se...@gmail.com>
Committed: Mon May 23 10:38:39 2016 -0700
----------------------------------------------------------------------
.../phoenix/end2end/CsvBulkLoadToolIT.java | 4 +
.../mapreduce/FormatToBytesWritableMapper.java | 111 +++++++++----------
.../mapreduce/FormatToKeyValueReducer.java | 75 +++++++------
3 files changed, 100 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9569cb52/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
index 1e9c1d9..8968555 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
@@ -277,6 +277,10 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterHBaseManagedTimeIT {
assertEquals(2, rs.getInt(1));
assertEquals("FirstName 2", rs.getString(2));
+ rs = stmt.executeQuery("SELECT LAST_NAME FROM TABLE6 where last_name='LastName 1'");
+ assertTrue(rs.next());
+ assertEquals("LastName 1", rs.getString(1));
+
rs.close();
stmt.close();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9569cb52/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
index ff21d4f..eb0e3ed 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
@@ -44,7 +44,6 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
-import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.ColumnInfo;
@@ -108,7 +107,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
/*
lookup table for column index. Index in the List matches to the index in tableNames List
*/
- protected List<Map<byte[], Map<byte[], Integer>>> columnIndexes;
+ protected Map<byte[], Integer> columnIndexes;
protected abstract UpsertExecutor<RECORD,?> buildUpsertExecutor(Configuration conf);
protected abstract LineParser<RECORD> getLineParser();
@@ -135,7 +134,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
logicalNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf);
- columnIndexes = initColumnIndexes();
+ initColumnIndexes();
} catch (SQLException | ClassNotFoundException e) {
throw new RuntimeException(e);
}
@@ -193,7 +192,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
int tableIndex = rowEntry.getKey();
List<KeyValue> lkv = rowEntry.getValue();
// All KV values combines to a single byte array
- writeAggregatedRow(context, tableIndex, lkv);
+ writeAggregatedRow(context, tableNames.get(tableIndex), lkv);
}
conn.rollback();
} catch (Exception e) {
@@ -201,99 +200,99 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
}
}
- private List<Map<byte[], Map<byte[], Integer>>> initColumnIndexes() throws SQLException {
- List<Map<byte[], Map<byte[], Integer>>> tableMap = new ArrayList<>();
- int tableIndex;
- for (tableIndex = 0; tableIndex < tableNames.size(); tableIndex++) {
- PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(tableIndex));
- Map<byte[], Map<byte[], Integer>> columnMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ /*
+ Map all unique pairs <family, name> to index. Table name is part of TableRowkey, so we do
+ not care about it
+ */
+ private void initColumnIndexes() throws SQLException {
+ columnIndexes = new TreeMap(Bytes.BYTES_COMPARATOR);
+ int columnIndex = 0;
+ for(int index = 0; index < logicalNames.size(); index++) {
+ PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(index));
List<PColumn> cls = table.getColumns();
for (int i = 0; i < cls.size(); i++) {
PColumn c = cls.get(i);
- if (c.getFamilyName() == null) continue; // Skip PK column
- byte[] family = c.getFamilyName().getBytes();
+ byte[] family = new byte[0];
+ if (c.getFamilyName() != null) // Skip PK column
+ family = c.getFamilyName().getBytes();
byte[] name = c.getName().getBytes();
- if (!columnMap.containsKey(family)) {
- columnMap.put(family, new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR));
+ byte[] cfn = Bytes.add(family,":".getBytes(), name);
+ if (!columnIndexes.containsKey(cfn)) {
+ columnIndexes.put(cfn, new Integer(columnIndex));
+ columnIndex++;
}
- Map<byte[], Integer> qualifier = columnMap.get(family);
- qualifier.put(name, i);
}
- tableMap.add(columnMap);
}
- return tableMap;
}
/**
* Find the column index which will replace the column name in
* the aggregated array and will be restored in Reducer
*
- * @param tableIndex Table index in tableNames list
* @param cell KeyValue for the column
* @return column index for the specified cell or -1 if was not found
*/
- private int findIndex(int tableIndex, Cell cell) {
- Map<byte[], Map<byte[], Integer>> columnMap = columnIndexes.get(tableIndex);
- Map<byte[], Integer> qualifiers = columnMap.get(Bytes.copy(cell.getFamilyArray(),
- cell.getFamilyOffset(), cell.getFamilyLength()));
- if (qualifiers != null) {
- Integer result = qualifiers.get(Bytes.copy(cell.getQualifierArray(),
- cell.getQualifierOffset(), cell.getQualifierLength()));
- if (result != null) {
- return result;
- }
+ private int findIndex(Cell cell) {
+ byte[] familyName = Bytes.copy(cell.getFamilyArray(), cell.getFamilyOffset(),
+ cell.getFamilyLength());
+ byte[] name = Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(),
+ cell.getQualifierLength());
+ byte[] cfn = Bytes.add(familyName, ":".getBytes(), name);
+ if(columnIndexes.containsKey(cfn)) {
+ return columnIndexes.get(cfn);
}
return -1;
}
/**
- * Collect all column values for the same rowKey
+ * Collect all column values for the same Row. RowKey may be different if indexes are involved,
+ * so it writes a separate record for each unique RowKey
*
* @param context Current mapper context
- * @param tableIndex Table index in tableNames list
+ * @param tableName Table index in tableNames list
* @param lkv List of KV values that will be combined in a single ImmutableBytesWritable
* @throws IOException
* @throws InterruptedException
*/
- private void writeAggregatedRow(Context context, int tableIndex, List<KeyValue> lkv)
+ private void writeAggregatedRow(Context context, String tableName, List<KeyValue> lkv)
throws IOException, InterruptedException {
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
DataOutputStream outputStream = new DataOutputStream(bos);
- ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
+ ImmutableBytesWritable outputKey =null;
if (!lkv.isEmpty()) {
- // All Key Values for the same row are supposed to be the same, so init rowKey only once
- Cell first = lkv.get(0);
- outputKey.set(first.getRowArray(), first.getRowOffset(), first.getRowLength());
for (KeyValue cell : lkv) {
- if (isEmptyCell(cell)) {
- continue;
- }
- int i = findIndex(tableIndex, cell);
- if (i == -1) {
- throw new IOException("No column found for KeyValue");
+ if (outputKey == null || Bytes.compareTo(outputKey.get(), outputKey.getOffset(),
+ outputKey.getLength(), cell.getRowArray(), cell.getRowOffset(), cell
+ .getRowLength()) != 0) {
+ // This a the first RowKey or a different from previous
+ if (outputKey != null) { //It's a different RowKey, so we need to write it
+ ImmutableBytesWritable aggregatedArray =
+ new ImmutableBytesWritable(bos.toByteArray());
+ outputStream.close();
+ context.write(new TableRowkeyPair(tableName, outputKey), aggregatedArray);
+ }
+ outputKey = new ImmutableBytesWritable(cell.getRowArray(), cell.getRowOffset()
+ , cell.getRowLength());
+ bos = new ByteArrayOutputStream(1024);
+ outputStream = new DataOutputStream(bos);
}
- WritableUtils.writeVInt(outputStream, i);
+ /*
+ The order of aggregation: type, index of column, length of value, value itself
+ */
outputStream.writeByte(cell.getTypeByte());
+ int i = findIndex(cell);
+ WritableUtils.writeVInt(outputStream, i);
WritableUtils.writeVInt(outputStream, cell.getValueLength());
outputStream.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+
}
+ ImmutableBytesWritable aggregatedArray = new ImmutableBytesWritable(bos.toByteArray());
+ outputStream.close();
+ context.write(new TableRowkeyPair(tableName, outputKey), aggregatedArray);
}
- ImmutableBytesWritable aggregatedArray = new ImmutableBytesWritable(bos.toByteArray());
- outputStream.close();
- context.write(new TableRowkeyPair(tableNames.get(tableIndex), outputKey), aggregatedArray);
}
- protected boolean isEmptyCell(KeyValue cell) {
- if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(),
- cell.getQualifierLength(), QueryConstants.EMPTY_COLUMN_BYTES, 0,
- QueryConstants.EMPTY_COLUMN_BYTES.length) != 0)
- return false;
- else
- return true;
- }
-
-
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
try {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/9569cb52/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
index 799b3dc..aa807c4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
@@ -21,16 +21,18 @@ import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.sql.SQLException;
-import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.Reducer;
@@ -42,7 +44,6 @@ import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.Closeables;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
@@ -63,8 +64,8 @@ public class FormatToKeyValueReducer
protected List<String> tableNames;
protected List<String> logicalNames;
protected KeyValueBuilder builder;
- List<List<Pair<byte[], byte[]>>> columnIndexes;
- List<ImmutableBytesPtr> emptyFamilyName;
+ private Map<Integer, Pair<byte[], byte[]>> columnIndexes;
+ private Map<String, ImmutableBytesPtr> emptyFamilyName;
@Override
@@ -76,7 +77,6 @@ public class FormatToKeyValueReducer
for (Map.Entry<String, String> entry : conf) {
clientInfos.setProperty(entry.getKey(), entry.getValue());
}
-
try {
PhoenixConnection conn = (PhoenixConnection) QueryUtil.getConnectionOnServer(clientInfos, conf);
builder = conn.getKeyValueBuilder();
@@ -84,9 +84,6 @@ public class FormatToKeyValueReducer
final String logicalNamesConf = conf.get(FormatToBytesWritableMapper.LOGICAL_NAMES_CONFKEY);
tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
logicalNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf);
-
- columnIndexes = new ArrayList<>(tableNames.size());
- emptyFamilyName = new ArrayList<>();
initColumnsMap(conn);
} catch (SQLException | ClassNotFoundException e) {
throw new RuntimeException(e);
@@ -94,24 +91,30 @@ public class FormatToKeyValueReducer
}
private void initColumnsMap(PhoenixConnection conn) throws SQLException {
- for (String tableName : logicalNames) {
- PTable table = PhoenixRuntime.getTable(conn, tableName);
- emptyFamilyName.add(SchemaUtil.getEmptyColumnFamilyPtr(table));
+ Map <byte[], Integer> indexMap = new TreeMap(Bytes.BYTES_COMPARATOR);
+ emptyFamilyName = new HashMap<>();
+ columnIndexes = new HashMap<>();
+ int columnIndex = 0;
+ for(int index = 0; index < logicalNames.size(); index++) {
+ PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(index));
+ emptyFamilyName.put(tableNames.get(index), SchemaUtil.getEmptyColumnFamilyPtr(table));
List<PColumn> cls = table.getColumns();
- List<Pair<byte[], byte[]>> list = new ArrayList(cls.size());
for (int i = 0; i < cls.size(); i++) {
PColumn c = cls.get(i);
- if (c.getFamilyName() == null) {
- list.add(null); // Skip PK column
- continue;
+ byte[] family = new byte[0];
+ if (c.getFamilyName() != null) {
+ family = c.getFamilyName().getBytes();
}
- byte[] family = c.getFamilyName().getBytes();
byte[] name = c.getName().getBytes();
- list.add(new Pair(family, name));
+ byte[] cfn = Bytes.add(family,":".getBytes(), name);
+ Pair<byte[], byte[]> pair = new Pair(family, name);
+ if (!indexMap.containsKey(cfn)) {
+ indexMap.put(cfn, new Integer(columnIndex));
+ columnIndexes.put(new Integer(columnIndex), pair);
+ columnIndex++;
+ }
}
- columnIndexes.add(list);
}
-
}
@Override
@@ -119,15 +122,27 @@ public class FormatToKeyValueReducer
Reducer<TableRowkeyPair, ImmutableBytesWritable, TableRowkeyPair, KeyValue>.Context context)
throws IOException, InterruptedException {
TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
- int tableIndex = tableNames.indexOf(key.getTableName());
- List<Pair<byte[], byte[]>> columns = columnIndexes.get(tableIndex);
+ ImmutableBytesWritable rowKey = key.getRowkey();
for (ImmutableBytesWritable aggregatedArray : values) {
DataInputStream input = new DataInputStream(new ByteArrayInputStream(aggregatedArray.get()));
while (input.available() != 0) {
- int index = WritableUtils.readVInt(input);
- Pair<byte[], byte[]> pair = columns.get(index);
byte type = input.readByte();
- ImmutableBytesWritable value = null;
+ int index = WritableUtils.readVInt(input);
+ ImmutableBytesWritable family;
+ ImmutableBytesWritable name;
+ ImmutableBytesWritable value = QueryConstants.EMPTY_COLUMN_VALUE_BYTES_PTR;
+ if (index == -1) {
+ family = emptyFamilyName.get(key.getTableName());
+ name = QueryConstants.EMPTY_COLUMN_BYTES_PTR;
+ } else {
+ Pair<byte[], byte[]> pair = columnIndexes.get(index);
+ if(pair.getFirst() != null) {
+ family = new ImmutableBytesWritable(pair.getFirst());
+ } else {
+ family = emptyFamilyName.get(key.getTableName());
+ }
+ name = new ImmutableBytesWritable(pair.getSecond());
+ }
int len = WritableUtils.readVInt(input);
if (len > 0) {
byte[] array = new byte[len];
@@ -138,24 +153,16 @@ public class FormatToKeyValueReducer
KeyValue.Type kvType = KeyValue.Type.codeToType(type);
switch (kvType) {
case Put: // not null value
- kv = builder.buildPut(key.getRowkey(),
- new ImmutableBytesWritable(pair.getFirst()),
- new ImmutableBytesWritable(pair.getSecond()), value);
+ kv = builder.buildPut(key.getRowkey(), family, name, value);
break;
case DeleteColumn: // null value
- kv = builder.buildDeleteColumns(key.getRowkey(),
- new ImmutableBytesWritable(pair.getFirst()),
- new ImmutableBytesWritable(pair.getSecond()));
+ kv = builder.buildDeleteColumns(key.getRowkey(), family, name);
break;
default:
throw new IOException("Unsupported KeyValue type " + kvType);
}
map.add(kv);
}
- KeyValue empty = builder.buildPut(key.getRowkey(),
- emptyFamilyName.get(tableIndex),
- QueryConstants.EMPTY_COLUMN_BYTES_PTR, ByteUtil.EMPTY_BYTE_ARRAY_PTR);
- map.add(empty);
Closeables.closeQuietly(input);
}
context.setStatus("Read " + map.getClass());
[3/4] phoenix git commit: PHOENIX-2925 CsvBulkloadTool not working
properly if there are multiple local indexes to the same table(After
PHOENIX-1973)
Posted by ss...@apache.org.
PHOENIX-2925 CsvBulkloadTool not working properly if there are multiple local indexes to the same table(After PHOENIX-1973)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/ec44a57c
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/ec44a57c
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/ec44a57c
Branch: refs/heads/4.x-HBase-1.0
Commit: ec44a57cc9836a3688a58c88d1c5547378fd2e17
Parents: 4d07528
Author: Sergey Soldatov <se...@gmail.com>
Authored: Mon May 23 02:22:45 2016 -0700
Committer: Sergey Soldatov <se...@gmail.com>
Committed: Mon May 23 12:26:26 2016 -0700
----------------------------------------------------------------------
.../phoenix/end2end/CsvBulkLoadToolIT.java | 4 +
.../mapreduce/FormatToBytesWritableMapper.java | 111 +++++++++----------
.../mapreduce/FormatToKeyValueReducer.java | 75 +++++++------
3 files changed, 100 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ec44a57c/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
index 1e9c1d9..8968555 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
@@ -277,6 +277,10 @@ public class CsvBulkLoadToolIT extends BaseOwnClusterHBaseManagedTimeIT {
assertEquals(2, rs.getInt(1));
assertEquals("FirstName 2", rs.getString(2));
+ rs = stmt.executeQuery("SELECT LAST_NAME FROM TABLE6 where last_name='LastName 1'");
+ assertTrue(rs.next());
+ assertEquals("LastName 1", rs.getString(1));
+
rs.close();
stmt.close();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ec44a57c/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
index ff21d4f..eb0e3ed 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
@@ -44,7 +44,6 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
-import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.ColumnInfo;
@@ -108,7 +107,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
/*
lookup table for column index. Index in the List matches to the index in tableNames List
*/
- protected List<Map<byte[], Map<byte[], Integer>>> columnIndexes;
+ protected Map<byte[], Integer> columnIndexes;
protected abstract UpsertExecutor<RECORD,?> buildUpsertExecutor(Configuration conf);
protected abstract LineParser<RECORD> getLineParser();
@@ -135,7 +134,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
logicalNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf);
- columnIndexes = initColumnIndexes();
+ initColumnIndexes();
} catch (SQLException | ClassNotFoundException e) {
throw new RuntimeException(e);
}
@@ -193,7 +192,7 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
int tableIndex = rowEntry.getKey();
List<KeyValue> lkv = rowEntry.getValue();
// All KV values combines to a single byte array
- writeAggregatedRow(context, tableIndex, lkv);
+ writeAggregatedRow(context, tableNames.get(tableIndex), lkv);
}
conn.rollback();
} catch (Exception e) {
@@ -201,99 +200,99 @@ public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWri
}
}
- private List<Map<byte[], Map<byte[], Integer>>> initColumnIndexes() throws SQLException {
- List<Map<byte[], Map<byte[], Integer>>> tableMap = new ArrayList<>();
- int tableIndex;
- for (tableIndex = 0; tableIndex < tableNames.size(); tableIndex++) {
- PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(tableIndex));
- Map<byte[], Map<byte[], Integer>> columnMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ /*
+ Map all unique pairs <family, name> to index. Table name is part of TableRowkey, so we do
+ not care about it
+ */
+ private void initColumnIndexes() throws SQLException {
+ columnIndexes = new TreeMap(Bytes.BYTES_COMPARATOR);
+ int columnIndex = 0;
+ for(int index = 0; index < logicalNames.size(); index++) {
+ PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(index));
List<PColumn> cls = table.getColumns();
for (int i = 0; i < cls.size(); i++) {
PColumn c = cls.get(i);
- if (c.getFamilyName() == null) continue; // Skip PK column
- byte[] family = c.getFamilyName().getBytes();
+ byte[] family = new byte[0];
+ if (c.getFamilyName() != null) // Skip PK column
+ family = c.getFamilyName().getBytes();
byte[] name = c.getName().getBytes();
- if (!columnMap.containsKey(family)) {
- columnMap.put(family, new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR));
+ byte[] cfn = Bytes.add(family,":".getBytes(), name);
+ if (!columnIndexes.containsKey(cfn)) {
+ columnIndexes.put(cfn, new Integer(columnIndex));
+ columnIndex++;
}
- Map<byte[], Integer> qualifier = columnMap.get(family);
- qualifier.put(name, i);
}
- tableMap.add(columnMap);
}
- return tableMap;
}
/**
* Find the column index which will replace the column name in
* the aggregated array and will be restored in Reducer
*
- * @param tableIndex Table index in tableNames list
* @param cell KeyValue for the column
* @return column index for the specified cell or -1 if was not found
*/
- private int findIndex(int tableIndex, Cell cell) {
- Map<byte[], Map<byte[], Integer>> columnMap = columnIndexes.get(tableIndex);
- Map<byte[], Integer> qualifiers = columnMap.get(Bytes.copy(cell.getFamilyArray(),
- cell.getFamilyOffset(), cell.getFamilyLength()));
- if (qualifiers != null) {
- Integer result = qualifiers.get(Bytes.copy(cell.getQualifierArray(),
- cell.getQualifierOffset(), cell.getQualifierLength()));
- if (result != null) {
- return result;
- }
+ private int findIndex(Cell cell) {
+ byte[] familyName = Bytes.copy(cell.getFamilyArray(), cell.getFamilyOffset(),
+ cell.getFamilyLength());
+ byte[] name = Bytes.copy(cell.getQualifierArray(), cell.getQualifierOffset(),
+ cell.getQualifierLength());
+ byte[] cfn = Bytes.add(familyName, ":".getBytes(), name);
+ if(columnIndexes.containsKey(cfn)) {
+ return columnIndexes.get(cfn);
}
return -1;
}
/**
- * Collect all column values for the same rowKey
+ * Collect all column values for the same Row. RowKey may be different if indexes are involved,
+ * so it writes a separate record for each unique RowKey
*
* @param context Current mapper context
- * @param tableIndex Table index in tableNames list
+ * @param tableName Table index in tableNames list
* @param lkv List of KV values that will be combined in a single ImmutableBytesWritable
* @throws IOException
* @throws InterruptedException
*/
- private void writeAggregatedRow(Context context, int tableIndex, List<KeyValue> lkv)
+ private void writeAggregatedRow(Context context, String tableName, List<KeyValue> lkv)
throws IOException, InterruptedException {
ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
DataOutputStream outputStream = new DataOutputStream(bos);
- ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
+ ImmutableBytesWritable outputKey =null;
if (!lkv.isEmpty()) {
- // All Key Values for the same row are supposed to be the same, so init rowKey only once
- Cell first = lkv.get(0);
- outputKey.set(first.getRowArray(), first.getRowOffset(), first.getRowLength());
for (KeyValue cell : lkv) {
- if (isEmptyCell(cell)) {
- continue;
- }
- int i = findIndex(tableIndex, cell);
- if (i == -1) {
- throw new IOException("No column found for KeyValue");
+ if (outputKey == null || Bytes.compareTo(outputKey.get(), outputKey.getOffset(),
+ outputKey.getLength(), cell.getRowArray(), cell.getRowOffset(), cell
+ .getRowLength()) != 0) {
+ // This a the first RowKey or a different from previous
+ if (outputKey != null) { //It's a different RowKey, so we need to write it
+ ImmutableBytesWritable aggregatedArray =
+ new ImmutableBytesWritable(bos.toByteArray());
+ outputStream.close();
+ context.write(new TableRowkeyPair(tableName, outputKey), aggregatedArray);
+ }
+ outputKey = new ImmutableBytesWritable(cell.getRowArray(), cell.getRowOffset()
+ , cell.getRowLength());
+ bos = new ByteArrayOutputStream(1024);
+ outputStream = new DataOutputStream(bos);
}
- WritableUtils.writeVInt(outputStream, i);
+ /*
+ The order of aggregation: type, index of column, length of value, value itself
+ */
outputStream.writeByte(cell.getTypeByte());
+ int i = findIndex(cell);
+ WritableUtils.writeVInt(outputStream, i);
WritableUtils.writeVInt(outputStream, cell.getValueLength());
outputStream.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+
}
+ ImmutableBytesWritable aggregatedArray = new ImmutableBytesWritable(bos.toByteArray());
+ outputStream.close();
+ context.write(new TableRowkeyPair(tableName, outputKey), aggregatedArray);
}
- ImmutableBytesWritable aggregatedArray = new ImmutableBytesWritable(bos.toByteArray());
- outputStream.close();
- context.write(new TableRowkeyPair(tableNames.get(tableIndex), outputKey), aggregatedArray);
}
- protected boolean isEmptyCell(KeyValue cell) {
- if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(),
- cell.getQualifierLength(), QueryConstants.EMPTY_COLUMN_BYTES, 0,
- QueryConstants.EMPTY_COLUMN_BYTES.length) != 0)
- return false;
- else
- return true;
- }
-
-
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
try {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ec44a57c/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
index 799b3dc..aa807c4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
@@ -21,16 +21,18 @@ import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.sql.SQLException;
-import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.Reducer;
@@ -42,7 +44,6 @@ import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.Closeables;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
@@ -63,8 +64,8 @@ public class FormatToKeyValueReducer
protected List<String> tableNames;
protected List<String> logicalNames;
protected KeyValueBuilder builder;
- List<List<Pair<byte[], byte[]>>> columnIndexes;
- List<ImmutableBytesPtr> emptyFamilyName;
+ private Map<Integer, Pair<byte[], byte[]>> columnIndexes;
+ private Map<String, ImmutableBytesPtr> emptyFamilyName;
@Override
@@ -76,7 +77,6 @@ public class FormatToKeyValueReducer
for (Map.Entry<String, String> entry : conf) {
clientInfos.setProperty(entry.getKey(), entry.getValue());
}
-
try {
PhoenixConnection conn = (PhoenixConnection) QueryUtil.getConnectionOnServer(clientInfos, conf);
builder = conn.getKeyValueBuilder();
@@ -84,9 +84,6 @@ public class FormatToKeyValueReducer
final String logicalNamesConf = conf.get(FormatToBytesWritableMapper.LOGICAL_NAMES_CONFKEY);
tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
logicalNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf);
-
- columnIndexes = new ArrayList<>(tableNames.size());
- emptyFamilyName = new ArrayList<>();
initColumnsMap(conn);
} catch (SQLException | ClassNotFoundException e) {
throw new RuntimeException(e);
@@ -94,24 +91,30 @@ public class FormatToKeyValueReducer
}
private void initColumnsMap(PhoenixConnection conn) throws SQLException {
- for (String tableName : logicalNames) {
- PTable table = PhoenixRuntime.getTable(conn, tableName);
- emptyFamilyName.add(SchemaUtil.getEmptyColumnFamilyPtr(table));
+ Map <byte[], Integer> indexMap = new TreeMap(Bytes.BYTES_COMPARATOR);
+ emptyFamilyName = new HashMap<>();
+ columnIndexes = new HashMap<>();
+ int columnIndex = 0;
+ for(int index = 0; index < logicalNames.size(); index++) {
+ PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(index));
+ emptyFamilyName.put(tableNames.get(index), SchemaUtil.getEmptyColumnFamilyPtr(table));
List<PColumn> cls = table.getColumns();
- List<Pair<byte[], byte[]>> list = new ArrayList(cls.size());
for (int i = 0; i < cls.size(); i++) {
PColumn c = cls.get(i);
- if (c.getFamilyName() == null) {
- list.add(null); // Skip PK column
- continue;
+ byte[] family = new byte[0];
+ if (c.getFamilyName() != null) {
+ family = c.getFamilyName().getBytes();
}
- byte[] family = c.getFamilyName().getBytes();
byte[] name = c.getName().getBytes();
- list.add(new Pair(family, name));
+ byte[] cfn = Bytes.add(family,":".getBytes(), name);
+ Pair<byte[], byte[]> pair = new Pair(family, name);
+ if (!indexMap.containsKey(cfn)) {
+ indexMap.put(cfn, new Integer(columnIndex));
+ columnIndexes.put(new Integer(columnIndex), pair);
+ columnIndex++;
+ }
}
- columnIndexes.add(list);
}
-
}
@Override
@@ -119,15 +122,27 @@ public class FormatToKeyValueReducer
Reducer<TableRowkeyPair, ImmutableBytesWritable, TableRowkeyPair, KeyValue>.Context context)
throws IOException, InterruptedException {
TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
- int tableIndex = tableNames.indexOf(key.getTableName());
- List<Pair<byte[], byte[]>> columns = columnIndexes.get(tableIndex);
+ ImmutableBytesWritable rowKey = key.getRowkey();
for (ImmutableBytesWritable aggregatedArray : values) {
DataInputStream input = new DataInputStream(new ByteArrayInputStream(aggregatedArray.get()));
while (input.available() != 0) {
- int index = WritableUtils.readVInt(input);
- Pair<byte[], byte[]> pair = columns.get(index);
byte type = input.readByte();
- ImmutableBytesWritable value = null;
+ int index = WritableUtils.readVInt(input);
+ ImmutableBytesWritable family;
+ ImmutableBytesWritable name;
+ ImmutableBytesWritable value = QueryConstants.EMPTY_COLUMN_VALUE_BYTES_PTR;
+ if (index == -1) {
+ family = emptyFamilyName.get(key.getTableName());
+ name = QueryConstants.EMPTY_COLUMN_BYTES_PTR;
+ } else {
+ Pair<byte[], byte[]> pair = columnIndexes.get(index);
+ if(pair.getFirst() != null) {
+ family = new ImmutableBytesWritable(pair.getFirst());
+ } else {
+ family = emptyFamilyName.get(key.getTableName());
+ }
+ name = new ImmutableBytesWritable(pair.getSecond());
+ }
int len = WritableUtils.readVInt(input);
if (len > 0) {
byte[] array = new byte[len];
@@ -138,24 +153,16 @@ public class FormatToKeyValueReducer
KeyValue.Type kvType = KeyValue.Type.codeToType(type);
switch (kvType) {
case Put: // not null value
- kv = builder.buildPut(key.getRowkey(),
- new ImmutableBytesWritable(pair.getFirst()),
- new ImmutableBytesWritable(pair.getSecond()), value);
+ kv = builder.buildPut(key.getRowkey(), family, name, value);
break;
case DeleteColumn: // null value
- kv = builder.buildDeleteColumns(key.getRowkey(),
- new ImmutableBytesWritable(pair.getFirst()),
- new ImmutableBytesWritable(pair.getSecond()));
+ kv = builder.buildDeleteColumns(key.getRowkey(), family, name);
break;
default:
throw new IOException("Unsupported KeyValue type " + kvType);
}
map.add(kv);
}
- KeyValue empty = builder.buildPut(key.getRowkey(),
- emptyFamilyName.get(tableIndex),
- QueryConstants.EMPTY_COLUMN_BYTES_PTR, ByteUtil.EMPTY_BYTE_ARRAY_PTR);
- map.add(empty);
Closeables.closeQuietly(input);
}
context.setStatus("Read " + map.getClass());