You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2016/03/02 19:40:38 UTC
[2/3] phoenix git commit: PHOENIX-1973 Improve CsvBulkLoadTool
performance by moving keyvalue construction from map phase to reduce phase
(Sergey Soldatov)
PHOENIX-1973 Improve CsvBulkLoadTool performance by moving keyvalue construction from map phase to reduce phase (Sergey Soldatov)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/1c06c5db
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/1c06c5db
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/1c06c5db
Branch: refs/heads/master
Commit: 1c06c5dbd1d04b3dc67e75eb505df8d62f1a09e4
Parents: 42e2b85
Author: James Taylor <jt...@salesforce.com>
Authored: Wed Mar 2 00:38:05 2016 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Mar 2 10:40:30 2016 -0800
----------------------------------------------------------------------
.../phoenix/mapreduce/AbstractBulkLoadTool.java | 6 +-
.../mapreduce/FormatToKeyValueMapper.java | 164 ++++++++++++++++---
.../mapreduce/FormatToKeyValueReducer.java | 126 ++++++++++++--
.../bulkload/TargetTableRefFunctions.java | 22 ++-
4 files changed, 280 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c06c5db/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
index 39ee4b1..ab2848f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -268,7 +269,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
job.setInputFormatClass(TextInputFormat.class);
job.setMapOutputKeyClass(TableRowkeyPair.class);
- job.setMapOutputValueClass(KeyValue.class);
+ job.setMapOutputValueClass(ImmutableBytesWritable.class);
job.setOutputKeyClass(TableRowkeyPair.class);
job.setOutputValueClass(KeyValue.class);
job.setReducerClass(FormatToKeyValueReducer.class);
@@ -276,7 +277,10 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
MultiHfileOutputFormat.configureIncrementalLoad(job, tablesToBeLoaded);
final String tableNamesAsJson = TargetTableRefFunctions.NAMES_TO_JSON.apply(tablesToBeLoaded);
+ final String logicalNamesAsJson = TargetTableRefFunctions.LOGICAN_NAMES_TO_JSON.apply(tablesToBeLoaded);
+
job.getConfiguration().set(FormatToKeyValueMapper.TABLE_NAMES_CONFKEY,tableNamesAsJson);
+ job.getConfiguration().set(FormatToKeyValueMapper.LOGICAL_NAMES_CONFKEY,logicalNamesAsJson);
// give subclasses their hook
setupJob(job);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c06c5db/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java
index 7e115e5..8dbb4aa 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java
@@ -17,30 +17,30 @@
*/
package org.apache.phoenix.mapreduce;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.sql.SQLException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
-import org.apache.phoenix.util.ColumnInfo;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.QueryUtil;
-import org.apache.phoenix.util.UpsertExecutor;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,7 +59,7 @@ import com.google.common.collect.Lists;
* to retrieve {@link ColumnInfo} from the target table.
*/
public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable, Text, TableRowkeyPair,
- KeyValue> {
+ ImmutableBytesWritable> {
protected static final Logger LOG = LoggerFactory.getLogger(FormatToKeyValueMapper.class);
@@ -79,6 +79,7 @@ public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable
/** Configuration key for the table names */
public static final String TABLE_NAMES_CONFKEY = "phoenix.mapreduce.import.tablenames";
+ public static final String LOGICAL_NAMES_CONFKEY = "phoenix.mapreduce.import.logicalnames";
/** Configuration key for the table configurations */
public static final String TABLE_CONFIG_CONFKEY = "phoenix.mapreduce.import.table.config";
@@ -94,8 +95,14 @@ public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable
protected UpsertExecutor<RECORD, ?> upsertExecutor;
protected ImportPreUpsertKeyValueProcessor preUpdateProcessor;
protected List<String> tableNames;
+ protected List<String> logicalNames;
protected MapperUpsertListener<RECORD> upsertListener;
+ /*
+ lookup table for column index. Index in the List matches to the index in tableNames List
+ */
+ protected List<Map<byte[], Map<byte[], Integer>>> columnIndexes;
+
protected abstract UpsertExecutor<RECORD,?> buildUpsertExecutor(Configuration conf);
protected abstract LineParser<RECORD> getLineParser();
@@ -112,13 +119,17 @@ public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable
try {
conn = (PhoenixConnection) QueryUtil.getConnection(clientInfos, conf);
+
+ final String tableNamesConf = conf.get(TABLE_NAMES_CONFKEY);
+ final String logicalNamesConf = conf.get(LOGICAL_NAMES_CONFKEY);
+ tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
+ logicalNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf);
+
+ columnIndexes = initColumnIndexes();
} catch (SQLException | ClassNotFoundException e) {
throw new RuntimeException(e);
}
- final String tableNamesConf = conf.get(TABLE_NAMES_CONFKEY);
- tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
-
upsertListener = new MapperUpsertListener<RECORD>(
context, conf.getBoolean(IGNORE_INVALID_ROW_CONFKEY, true));
upsertExecutor = buildUpsertExecutor(conf);
@@ -127,7 +138,8 @@ public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable
@SuppressWarnings("deprecation")
@Override
- protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ protected void map(LongWritable key, Text value, Context context) throws IOException,
+ InterruptedException {
if (conn == null) {
throw new RuntimeException("Connection not initialized.");
}
@@ -145,7 +157,7 @@ public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable
return;
}
upsertExecutor.execute(ImmutableList.<RECORD>of(record));
-
+ Map<Integer, List<KeyValue>> map = new HashMap<>();
Iterator<Pair<byte[], List<KeyValue>>> uncommittedDataIterator
= PhoenixRuntime.getUncommittedDataIterator(conn, true);
while (uncommittedDataIterator.hasNext()) {
@@ -153,24 +165,125 @@ public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable
List<KeyValue> keyValueList = kvPair.getSecond();
keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), keyValueList);
byte[] first = kvPair.getFirst();
- for (String tableName : tableNames) {
- if (Bytes.compareTo(Bytes.toBytes(tableName), first) != 0) {
- // skip edits for other tables
- continue;
- }
- for (KeyValue kv : keyValueList) {
- ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
- outputKey.set(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength());
- context.write(new TableRowkeyPair(tableName, outputKey), kv);
+ // Create a list of KV for each table
+ for (int i = 0; i < tableNames.size(); i++) {
+ if (Bytes.compareTo(Bytes.toBytes(tableNames.get(i)), first) == 0) {
+ if (!map.containsKey(i)) {
+ map.put(i, new ArrayList<KeyValue>());
+ }
+ List<KeyValue> list = map.get(i);
+ for (KeyValue kv : keyValueList) {
+ list.add(kv);
+ }
+ break;
}
}
}
+ for(Map.Entry<Integer, List<KeyValue>> rowEntry : map.entrySet()) {
+ int tableIndex = rowEntry.getKey();
+ List<KeyValue> lkv = rowEntry.getValue();
+ // All KV values combines to a single byte array
+ writeAggregatedRow(context, tableIndex, lkv);
+ }
conn.rollback();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
+ private List<Map<byte[],Map<byte[], Integer>>> initColumnIndexes() throws SQLException {
+ List<Map<byte[], Map<byte[], Integer>>> tableMap = new ArrayList<>();
+ int tableIndex;
+ for (tableIndex = 0; tableIndex < tableNames.size(); tableIndex++) {
+ PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(tableIndex));
+ Map<byte[], Map<byte[], Integer>> columnMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ List<PColumn> cls = table.getColumns();
+ for(int i = 0; i < cls.size(); i++) {
+ PColumn c = cls.get(i);
+ if(c.getFamilyName() == null) continue; // Skip PK column
+ byte[] family = c.getFamilyName().getBytes();
+ byte[] name = c.getName().getBytes();
+ if(!columnMap.containsKey(family)) {
+ columnMap.put(family, new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR));
+ }
+ Map<byte[], Integer> qualifier = columnMap.get(family);
+ qualifier.put(name, i);
+ }
+ tableMap.add(columnMap);
+ }
+ return tableMap;
+ }
+
+ /**
+ * Find the column index which will replace the column name in
+ * the aggregated array and will be restored in Reducer
+ *
+ * @param tableIndex Table index in tableNames list
+ * @param cell KeyValue for the column
+ * @return column index for the specified cell or -1 if was not found
+ */
+ private int findIndex(int tableIndex, Cell cell) {
+ Map<byte[], Map<byte[], Integer>> columnMap = columnIndexes.get(tableIndex);
+ Map<byte[], Integer> qualifiers = columnMap.get(Bytes.copy(cell.getFamilyArray(),
+ cell.getFamilyOffset(), cell.getFamilyLength()));
+ if(qualifiers!= null) {
+ Integer result = qualifiers.get(Bytes.copy(cell.getQualifierArray(),
+ cell.getQualifierOffset(), cell.getQualifierLength()));
+ if(result!=null) {
+ return result;
+ }
+ }
+ return -1;
+ }
+
+ /**
+ * Collect all column values for the same rowKey
+ *
+ * @param context Current mapper context
+ * @param tableIndex Table index in tableNames list
+ * @param lkv List of KV values that will be combined in a single ImmutableBytesWritable
+ * @throws IOException
+ * @throws InterruptedException
+ */
+
+ private void writeAggregatedRow(Context context, int tableIndex, List<KeyValue> lkv)
+ throws IOException, InterruptedException {
+ TrustedByteArrayOutputStream bos = new TrustedByteArrayOutputStream(1024);
+ DataOutputStream outputStream = new DataOutputStream(bos);
+ ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
+ if (!lkv.isEmpty()) {
+ // All Key Values for the same row are supposed to be the same, so init rowKey only once
+ Cell first = lkv.get(0);
+ outputKey.set(first.getRowArray(), first.getRowOffset(), first.getRowLength());
+ for (KeyValue cell : lkv) {
+ if(isEmptyCell(cell)) {
+ continue;
+ }
+ int i = findIndex(tableIndex, cell);
+ if (i == -1) {
+ throw new IOException("No column found for KeyValue");
+ }
+ WritableUtils.writeVInt(outputStream, i);
+ outputStream.writeByte(cell.getTypeByte());
+ WritableUtils.writeVInt(outputStream, cell.getValueLength());
+ outputStream.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+ }
+ }
+ ImmutableBytesWritable aggregatedArray = new ImmutableBytesWritable(bos.toByteArray());
+ outputStream.close();
+ context.write(new TableRowkeyPair(tableNames.get(tableIndex), outputKey), aggregatedArray);
+ }
+
+ protected boolean isEmptyCell(KeyValue cell) {
+ if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(),
+ cell.getQualifierLength(), QueryConstants.EMPTY_COLUMN_BYTES, 0,
+ QueryConstants.EMPTY_COLUMN_BYTES.length) != 0)
+ return false;
+ else
+ return true;
+ }
+
+
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
try {
@@ -223,11 +336,12 @@ public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable
@VisibleForTesting
static class MapperUpsertListener<T> implements UpsertExecutor.UpsertListener<T> {
- private final Mapper<LongWritable, Text, TableRowkeyPair, KeyValue>.Context context;
+ private final Mapper<LongWritable, Text,
+ TableRowkeyPair, ImmutableBytesWritable>.Context context;
private final boolean ignoreRecordErrors;
private MapperUpsertListener(
- Mapper<LongWritable, Text, TableRowkeyPair, KeyValue>.Context context,
+ Mapper<LongWritable, Text, TableRowkeyPair, ImmutableBytesWritable>.Context context,
boolean ignoreRecordErrors) {
this.context = context;
this.ignoreRecordErrors = ignoreRecordErrors;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c06c5db/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
index 5d00656..fb61855 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
@@ -17,36 +17,142 @@
*/
package org.apache.phoenix.mapreduce;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.IOException;
-import java.util.TreeSet;
+import java.sql.SQLException;
+import java.util.*;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
+import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
+import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Reducer class for the bulkload jobs.
* Performs similar functionality to {@link KeyValueSortReducer}
*/
public class FormatToKeyValueReducer
- extends Reducer<TableRowkeyPair,KeyValue,TableRowkeyPair,KeyValue> {
+ extends Reducer<TableRowkeyPair,ImmutableBytesWritable,TableRowkeyPair,KeyValue> {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(FormatToKeyValueReducer.class);
+
+
+ protected List<String> tableNames;
+ protected List<String> logicalNames;
+ protected KeyValueBuilder builder;
+ List<List<Pair<byte[], byte[]>>> columnIndexes;
+ List<ImmutableBytesPtr> emptyFamilyName;
+
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+
+ // pass client configuration into driver
+ Properties clientInfos = new Properties();
+ for (Map.Entry<String, String> entry : conf) {
+ clientInfos.setProperty(entry.getKey(), entry.getValue());
+ }
+
+ try {
+ PhoenixConnection conn = (PhoenixConnection) QueryUtil.getConnection(clientInfos, conf);
+ builder = conn.getKeyValueBuilder();
+ final String tableNamesConf = conf.get(FormatToKeyValueMapper.TABLE_NAMES_CONFKEY);
+ final String logicalNamesConf = conf.get(FormatToKeyValueMapper.LOGICAL_NAMES_CONFKEY);
+ tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
+ logicalNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf);
+
+ columnIndexes = new ArrayList<>(tableNames.size());
+ emptyFamilyName = new ArrayList<>();
+ initColumnsMap(conn);
+ } catch (SQLException | ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void initColumnsMap(PhoenixConnection conn) throws SQLException {
+ for (String tableName : logicalNames) {
+ PTable table = PhoenixRuntime.getTable(conn, tableName);
+ emptyFamilyName.add(SchemaUtil.getEmptyColumnFamilyPtr(table));
+ List<PColumn> cls = table.getColumns();
+ List<Pair<byte[], byte[]>> list = new ArrayList(cls.size());
+ for(int i = 0; i < cls.size(); i++) {
+ PColumn c = cls.get(i);
+ if(c.getFamilyName() == null) {
+ list.add(null); // Skip PK column
+ continue;
+ }
+ byte[] family = c.getFamilyName().getBytes();
+ byte[] name = c.getName().getBytes();
+ list.add(new Pair(family, name));
+ }
+ columnIndexes.add(list);
+ }
+
+ }
@Override
- protected void reduce(TableRowkeyPair key, Iterable<KeyValue> values,
- Reducer<TableRowkeyPair, KeyValue, TableRowkeyPair, KeyValue>.Context context)
+ protected void reduce(TableRowkeyPair key, Iterable<ImmutableBytesWritable> values,
+ Reducer<TableRowkeyPair, ImmutableBytesWritable, TableRowkeyPair, KeyValue>.Context context)
throws IOException, InterruptedException {
TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
- for (KeyValue kv: values) {
- try {
- map.add(kv.clone());
- } catch (CloneNotSupportedException e) {
- throw new java.io.IOException(e);
+ int tableIndex = tableNames.indexOf(key.getTableName());
+ List<Pair<byte[], byte[]>> columns = columnIndexes.get(tableIndex);
+ for (ImmutableBytesWritable aggregatedArray : values) {
+ DataInputStream input = new DataInputStream(new ByteArrayInputStream(aggregatedArray.get()));
+ while (input.available()!= 0) {
+ int index = WritableUtils.readVInt(input);
+ Pair<byte[], byte[]> pair = columns.get(index);
+ byte type = input.readByte();
+ ImmutableBytesWritable value = null;
+ int len = WritableUtils.readVInt(input);
+ if (len > 0) {
+ byte[] array = new byte[len];
+ input.read(array);
+ value = new ImmutableBytesWritable(array);
+ }
+ KeyValue kv;
+ KeyValue.Type kvType = KeyValue.Type.codeToType(type);
+ switch (kvType) {
+ case Put: // not null value
+ kv = builder.buildPut(key.getRowkey(),
+ new ImmutableBytesWritable(pair.getFirst()),
+ new ImmutableBytesWritable(pair.getSecond()), value);
+ break;
+ case DeleteColumn: // null value
+ kv = builder.buildDeleteColumns(key.getRowkey(),
+ new ImmutableBytesWritable(pair.getFirst()),
+ new ImmutableBytesWritable(pair.getSecond()));
+ break;
+ default:
+ throw new IOException("Unsupported KeyValue type " + kvType);
+ }
+ map.add(kv);
}
+ KeyValue empty = builder.buildPut(key.getRowkey(),
+ emptyFamilyName.get(tableIndex),
+ QueryConstants.EMPTY_COLUMN_BYTES_PTR, ByteUtil.EMPTY_BYTE_ARRAY_PTR);
+ map.add(empty);
+ Closeables.closeQuietly(input);
}
context.setStatus("Read " + map.getClass());
int index = 0;
- for (KeyValue kv: map) {
+ for (KeyValue kv : map) {
context.write(key, kv);
if (++index % 100 == 0) context.setStatus("Wrote " + index);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/1c06c5db/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java
index d786842..e02065f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java
@@ -78,7 +78,25 @@ public class TargetTableRefFunctions {
}
};
- public static Function<String,List<String>> NAMES_FROM_JSON = new Function<String,List<String>>() {
+ public static Function<List<TargetTableRef>,String> LOGICAN_NAMES_TO_JSON = new Function<List<TargetTableRef>,String>() {
+
+ @Override
+ public String apply(List<TargetTableRef> input) {
+ try {
+ List<String> tableNames = Lists.newArrayListWithCapacity(input.size());
+ for(TargetTableRef table : input) {
+ tableNames.add(table.getLogicalName());
+ }
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.writeValueAsString(tableNames);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+ };
+
+ public static Function<String,List<String>> NAMES_FROM_JSON = new Function<String,List<String>>() {
@SuppressWarnings("unchecked")
@Override
@@ -92,4 +110,4 @@ public class TargetTableRefFunctions {
}
};
- }
+}