You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2016/03/29 21:50:35 UTC
[02/30] phoenix git commit: Revert "PHOENIX-1973 Improve
CsvBulkLoadTool performance by moving keyvalue construction from map phase to
reduce phase(Sergey Soldatov)"
Revert "PHOENIX-1973 Improve CsvBulkLoadTool performance by moving keyvalue construction from map phase to reduce phase(Sergey Soldatov)"
This reverts commit e797b36c2ce42e9b9fd6b37fd8b9f79f79d6f18f.
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c083bc82
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c083bc82
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c083bc82
Branch: refs/heads/calcite
Commit: c083bc825e2f5fd99f7343a784ab5700dbcff6df
Parents: 4f74323
Author: James Taylor <jt...@salesforce.com>
Authored: Sat Feb 27 09:57:01 2016 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Sat Feb 27 09:57:01 2016 -0800
----------------------------------------------------------------------
.../phoenix/mapreduce/AbstractBulkLoadTool.java | 6 +-
.../mapreduce/FormatToKeyValueMapper.java | 164 +++----------------
.../mapreduce/FormatToKeyValueReducer.java | 127 ++------------
.../bulkload/TargetTableRefFunctions.java | 22 +--
4 files changed, 38 insertions(+), 281 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c083bc82/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
index ab2848f..39ee4b1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -269,7 +268,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
job.setInputFormatClass(TextInputFormat.class);
job.setMapOutputKeyClass(TableRowkeyPair.class);
- job.setMapOutputValueClass(ImmutableBytesWritable.class);
+ job.setMapOutputValueClass(KeyValue.class);
job.setOutputKeyClass(TableRowkeyPair.class);
job.setOutputValueClass(KeyValue.class);
job.setReducerClass(FormatToKeyValueReducer.class);
@@ -277,10 +276,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
MultiHfileOutputFormat.configureIncrementalLoad(job, tablesToBeLoaded);
final String tableNamesAsJson = TargetTableRefFunctions.NAMES_TO_JSON.apply(tablesToBeLoaded);
- final String logicalNamesAsJson = TargetTableRefFunctions.LOGICAN_NAMES_TO_JSON.apply(tablesToBeLoaded);
-
job.getConfiguration().set(FormatToKeyValueMapper.TABLE_NAMES_CONFKEY,tableNamesAsJson);
- job.getConfiguration().set(FormatToKeyValueMapper.LOGICAL_NAMES_CONFKEY,logicalNamesAsJson);
// give subclasses their hook
setupJob(job);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c083bc82/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java
index 95b099e..7e115e5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java
@@ -17,30 +17,30 @@
*/
package org.apache.phoenix.mapreduce;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.sql.SQLException;
-import java.util.*;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
-import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.schema.PColumn;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.util.*;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.UpsertExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,7 +59,7 @@ import com.google.common.collect.Lists;
* to retrieve {@link ColumnInfo} from the target table.
*/
public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable, Text, TableRowkeyPair,
- ImmutableBytesWritable> {
+ KeyValue> {
protected static final Logger LOG = LoggerFactory.getLogger(FormatToKeyValueMapper.class);
@@ -79,7 +79,6 @@ public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable
/** Configuration key for the table names */
public static final String TABLE_NAMES_CONFKEY = "phoenix.mapreduce.import.tablenames";
- public static final String LOGICAL_NAMES_CONFKEY = "phoenix.mapreduce.import.logicalnames";
/** Configuration key for the table configurations */
public static final String TABLE_CONFIG_CONFKEY = "phoenix.mapreduce.import.table.config";
@@ -95,14 +94,8 @@ public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable
protected UpsertExecutor<RECORD, ?> upsertExecutor;
protected ImportPreUpsertKeyValueProcessor preUpdateProcessor;
protected List<String> tableNames;
- protected List<String> logicalNames;
protected MapperUpsertListener<RECORD> upsertListener;
- /*
- lookup table for column index. Index in the List matches to the index in tableNames List
- */
- protected List<Map<byte[], Map<byte[], Integer>>> columnIndexes;
-
protected abstract UpsertExecutor<RECORD,?> buildUpsertExecutor(Configuration conf);
protected abstract LineParser<RECORD> getLineParser();
@@ -119,17 +112,13 @@ public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable
try {
conn = (PhoenixConnection) QueryUtil.getConnection(clientInfos, conf);
-
- final String tableNamesConf = conf.get(TABLE_NAMES_CONFKEY);
- final String logicalNamesConf = conf.get(LOGICAL_NAMES_CONFKEY);
- tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
- logicalNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf);
-
- columnIndexes = initColumnIndexes();
} catch (SQLException | ClassNotFoundException e) {
throw new RuntimeException(e);
}
+ final String tableNamesConf = conf.get(TABLE_NAMES_CONFKEY);
+ tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
+
upsertListener = new MapperUpsertListener<RECORD>(
context, conf.getBoolean(IGNORE_INVALID_ROW_CONFKEY, true));
upsertExecutor = buildUpsertExecutor(conf);
@@ -138,8 +127,7 @@ public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable
@SuppressWarnings("deprecation")
@Override
- protected void map(LongWritable key, Text value, Context context) throws IOException,
- InterruptedException {
+ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
if (conn == null) {
throw new RuntimeException("Connection not initialized.");
}
@@ -157,7 +145,7 @@ public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable
return;
}
upsertExecutor.execute(ImmutableList.<RECORD>of(record));
- Map<Integer, List<KeyValue>> map = new HashMap<>();
+
Iterator<Pair<byte[], List<KeyValue>>> uncommittedDataIterator
= PhoenixRuntime.getUncommittedDataIterator(conn, true);
while (uncommittedDataIterator.hasNext()) {
@@ -165,125 +153,24 @@ public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable
List<KeyValue> keyValueList = kvPair.getSecond();
keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), keyValueList);
byte[] first = kvPair.getFirst();
- // Create a list of KV for each table
- for (int i = 0; i < tableNames.size(); i++) {
- if (Bytes.compareTo(Bytes.toBytes(tableNames.get(i)), first) == 0) {
- if (!map.containsKey(i)) {
- map.put(i, new ArrayList<KeyValue>());
- }
- List<KeyValue> list = map.get(i);
- for (KeyValue kv : keyValueList) {
- list.add(kv);
- }
- break;
+ for (String tableName : tableNames) {
+ if (Bytes.compareTo(Bytes.toBytes(tableName), first) != 0) {
+ // skip edits for other tables
+ continue;
+ }
+ for (KeyValue kv : keyValueList) {
+ ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
+ outputKey.set(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength());
+ context.write(new TableRowkeyPair(tableName, outputKey), kv);
}
}
}
- for(Map.Entry<Integer, List<KeyValue>> rowEntry : map.entrySet()) {
- int tableIndex = rowEntry.getKey();
- List<KeyValue> lkv = rowEntry.getValue();
- // All KV values combines to a single byte array
- writeAggregatedRow(context, tableIndex, lkv);
- }
conn.rollback();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
- private List<Map<byte[],Map<byte[], Integer>>> initColumnIndexes() throws SQLException {
- List<Map<byte[], Map<byte[], Integer>>> tableMap = new ArrayList<>();
- int tableIndex;
- for (tableIndex = 0; tableIndex < tableNames.size(); tableIndex++) {
- PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(tableIndex));
- Map<byte[], Map<byte[], Integer>> columnMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
- List<PColumn> cls = table.getColumns();
- for(int i = 0; i < cls.size(); i++) {
- PColumn c = cls.get(i);
- if(c.getFamilyName() == null) continue; // Skip PK column
- byte[] family = c.getFamilyName().getBytes();
- byte[] name = c.getName().getBytes();
- if(!columnMap.containsKey(family)) {
- columnMap.put(family, new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR));
- }
- Map<byte[], Integer> qualifier = columnMap.get(family);
- qualifier.put(name, i);
- }
- tableMap.add(columnMap);
- }
- return tableMap;
- }
-
- /**
- * Find the column index which will replace the column name in
- * the aggregated array and will be restored in Reducer
- *
- * @param tableIndex Table index in tableNames list
- * @param cell KeyValue for the column
- * @return column index for the specified cell or -1 if was not found
- */
- private int findIndex(int tableIndex, Cell cell) {
- Map<byte[], Map<byte[], Integer>> columnMap = columnIndexes.get(tableIndex);
- Map<byte[], Integer> qualifiers = columnMap.get(Bytes.copy(cell.getFamilyArray(),
- cell.getFamilyOffset(), cell.getFamilyLength()));
- if(qualifiers!= null) {
- Integer result = qualifiers.get(Bytes.copy(cell.getQualifierArray(),
- cell.getQualifierOffset(), cell.getQualifierLength()));
- if(result!=null) {
- return result;
- }
- }
- return -1;
- }
-
- /**
- * Collect all column values for the same rowKey
- *
- * @param context Current mapper context
- * @param tableIndex Table index in tableNames list
- * @param lkv List of KV values that will be combined in a single ImmutableBytesWritable
- * @throws IOException
- * @throws InterruptedException
- */
-
- private void writeAggregatedRow(Context context, int tableIndex, List<KeyValue> lkv)
- throws IOException, InterruptedException {
- TrustedByteArrayOutputStream bos = new TrustedByteArrayOutputStream(1024);
- DataOutputStream outputStream = new DataOutputStream(bos);
- ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
- if (!lkv.isEmpty()) {
- // All Key Values for the same row are supposed to be the same, so init rowKey only once
- Cell first = lkv.get(0);
- outputKey.set(first.getRowArray(), first.getRowOffset(), first.getRowLength());
- for (KeyValue cell : lkv) {
- if(isEmptyCell(cell)) {
- continue;
- }
- int i = findIndex(tableIndex, cell);
- if (i == -1) {
- throw new IOException("No column found for KeyValue");
- }
- WritableUtils.writeVInt(outputStream, i);
- outputStream.writeByte(cell.getTypeByte());
- WritableUtils.writeVInt(outputStream, cell.getValueLength());
- outputStream.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
- }
- }
- ImmutableBytesWritable aggregatedArray = new ImmutableBytesWritable(bos.toByteArray());
- outputStream.close();
- context.write(new TableRowkeyPair(Integer.toString(tableIndex), outputKey), aggregatedArray);
- }
-
- protected boolean isEmptyCell(KeyValue cell) {
- if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(),
- cell.getQualifierLength(), QueryConstants.EMPTY_COLUMN_BYTES, 0,
- QueryConstants.EMPTY_COLUMN_BYTES.length) != 0)
- return false;
- else
- return true;
- }
-
-
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
try {
@@ -336,12 +223,11 @@ public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable
@VisibleForTesting
static class MapperUpsertListener<T> implements UpsertExecutor.UpsertListener<T> {
- private final Mapper<LongWritable, Text,
- TableRowkeyPair, ImmutableBytesWritable>.Context context;
+ private final Mapper<LongWritable, Text, TableRowkeyPair, KeyValue>.Context context;
private final boolean ignoreRecordErrors;
private MapperUpsertListener(
- Mapper<LongWritable, Text, TableRowkeyPair, ImmutableBytesWritable>.Context context,
+ Mapper<LongWritable, Text, TableRowkeyPair, KeyValue>.Context context,
boolean ignoreRecordErrors) {
this.context = context;
this.ignoreRecordErrors = ignoreRecordErrors;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c083bc82/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
index 0f90e45..5d00656 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
@@ -17,143 +17,36 @@
*/
package org.apache.phoenix.mapreduce;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
import java.io.IOException;
-import java.sql.SQLException;
-import java.util.*;
+import java.util.TreeSet;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
-import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
-import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
-import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.schema.PColumn;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.util.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Reducer class for the bulkload jobs.
* Performs similar functionality to {@link KeyValueSortReducer}
*/
public class FormatToKeyValueReducer
- extends Reducer<TableRowkeyPair,ImmutableBytesWritable,TableRowkeyPair,KeyValue> {
-
- protected static final Logger LOG = LoggerFactory.getLogger(FormatToKeyValueReducer.class);
-
-
- protected List<String> tableNames;
- protected List<String> logicalNames;
- protected KeyValueBuilder builder;
- List<List<Pair<byte[], byte[]>>> columnIndexes;
- List<ImmutableBytesPtr> emptyFamilyName;
-
-
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- Configuration conf = context.getConfiguration();
-
- // pass client configuration into driver
- Properties clientInfos = new Properties();
- for (Map.Entry<String, String> entry : conf) {
- clientInfos.setProperty(entry.getKey(), entry.getValue());
- }
-
- try {
- PhoenixConnection conn = (PhoenixConnection) QueryUtil.getConnection(clientInfos, conf);
- builder = conn.getKeyValueBuilder();
- final String tableNamesConf = conf.get(FormatToKeyValueMapper.TABLE_NAMES_CONFKEY);
- final String logicalNamesConf = conf.get(FormatToKeyValueMapper.LOGICAL_NAMES_CONFKEY);
- tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
- logicalNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf);
-
- columnIndexes = new ArrayList<>(tableNames.size());
- emptyFamilyName = new ArrayList<>();
- initColumnsMap(conn);
- } catch (SQLException | ClassNotFoundException e) {
- throw new RuntimeException(e);
- }
- }
-
- private void initColumnsMap(PhoenixConnection conn) throws SQLException {
- for (String tableName : logicalNames) {
- PTable table = PhoenixRuntime.getTable(conn, tableName);
- emptyFamilyName.add(SchemaUtil.getEmptyColumnFamilyPtr(table));
- List<PColumn> cls = table.getColumns();
- List<Pair<byte[], byte[]>> list = new ArrayList(cls.size());
- for(int i = 0; i < cls.size(); i++) {
- PColumn c = cls.get(i);
- if(c.getFamilyName() == null) {
- list.add(null); // Skip PK column
- continue;
- }
- byte[] family = c.getFamilyName().getBytes();
- byte[] name = c.getName().getBytes();
- list.add(new Pair(family, name));
- }
- columnIndexes.add(list);
- }
-
- }
+ extends Reducer<TableRowkeyPair,KeyValue,TableRowkeyPair,KeyValue> {
@Override
- protected void reduce(TableRowkeyPair key, Iterable<ImmutableBytesWritable> values,
- Reducer<TableRowkeyPair, ImmutableBytesWritable, TableRowkeyPair, KeyValue>.Context context)
+ protected void reduce(TableRowkeyPair key, Iterable<KeyValue> values,
+ Reducer<TableRowkeyPair, KeyValue, TableRowkeyPair, KeyValue>.Context context)
throws IOException, InterruptedException {
TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
- int tableIndex = Integer.parseInt(key.getTableName());
- key.setTableName(tableNames.get(tableIndex));
- List<Pair<byte[], byte[]>> columns = columnIndexes.get(tableIndex);
- for (ImmutableBytesWritable aggregatedArray : values) {
- DataInputStream input = new DataInputStream(new ByteArrayInputStream(aggregatedArray.get()));
- while (input.available()!= 0) {
- int index = WritableUtils.readVInt(input);
- Pair<byte[], byte[]> pair = columns.get(index);
- byte type = input.readByte();
- ImmutableBytesWritable value = null;
- int len = WritableUtils.readVInt(input);
- if (len > 0) {
- byte[] array = new byte[len];
- input.read(array);
- value = new ImmutableBytesWritable(array);
- }
- KeyValue kv;
- KeyValue.Type kvType = KeyValue.Type.codeToType(type);
- switch (kvType) {
- case Put: // not null value
- kv = builder.buildPut(key.getRowkey(),
- new ImmutableBytesWritable(pair.getFirst()),
- new ImmutableBytesWritable(pair.getSecond()), value);
- break;
- case DeleteColumn: // null value
- kv = builder.buildDeleteColumns(key.getRowkey(),
- new ImmutableBytesWritable(pair.getFirst()),
- new ImmutableBytesWritable(pair.getSecond()));
- break;
- default:
- throw new IOException("Unsupported KeyValue type " + kvType);
- }
- map.add(kv);
+ for (KeyValue kv: values) {
+ try {
+ map.add(kv.clone());
+ } catch (CloneNotSupportedException e) {
+ throw new java.io.IOException(e);
}
- KeyValue empty = builder.buildPut(key.getRowkey(),
- emptyFamilyName.get(tableIndex),
- QueryConstants.EMPTY_COLUMN_BYTES_PTR, ByteUtil.EMPTY_BYTE_ARRAY_PTR);
- map.add(empty);
- Closeables.closeQuietly(input);
}
context.setStatus("Read " + map.getClass());
int index = 0;
- for (KeyValue kv : map) {
+ for (KeyValue kv: map) {
context.write(key, kv);
if (++index % 100 == 0) context.setStatus("Wrote " + index);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c083bc82/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java
index e02065f..d786842 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java
@@ -78,25 +78,7 @@ public class TargetTableRefFunctions {
}
};
- public static Function<List<TargetTableRef>,String> LOGICAN_NAMES_TO_JSON = new Function<List<TargetTableRef>,String>() {
-
- @Override
- public String apply(List<TargetTableRef> input) {
- try {
- List<String> tableNames = Lists.newArrayListWithCapacity(input.size());
- for(TargetTableRef table : input) {
- tableNames.add(table.getLogicalName());
- }
- ObjectMapper mapper = new ObjectMapper();
- return mapper.writeValueAsString(tableNames);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- }
- };
-
- public static Function<String,List<String>> NAMES_FROM_JSON = new Function<String,List<String>>() {
+ public static Function<String,List<String>> NAMES_FROM_JSON = new Function<String,List<String>>() {
@SuppressWarnings("unchecked")
@Override
@@ -110,4 +92,4 @@ public class TargetTableRefFunctions {
}
};
-}
+ }