You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2016/03/29 21:50:35 UTC

[02/30] phoenix git commit: Revert "PHOENIX-1973 Improve CsvBulkLoadTool performance by moving keyvalue construction from map phase to reduce phase(Sergey Soldatov)"

Revert "PHOENIX-1973 Improve CsvBulkLoadTool performance by moving keyvalue construction from map phase to reduce phase(Sergey Soldatov)"

This reverts commit e797b36c2ce42e9b9fd6b37fd8b9f79f79d6f18f.


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c083bc82
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c083bc82
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c083bc82

Branch: refs/heads/calcite
Commit: c083bc825e2f5fd99f7343a784ab5700dbcff6df
Parents: 4f74323
Author: James Taylor <jt...@salesforce.com>
Authored: Sat Feb 27 09:57:01 2016 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Sat Feb 27 09:57:01 2016 -0800

----------------------------------------------------------------------
 .../phoenix/mapreduce/AbstractBulkLoadTool.java |   6 +-
 .../mapreduce/FormatToKeyValueMapper.java       | 164 +++----------------
 .../mapreduce/FormatToKeyValueReducer.java      | 127 ++------------
 .../bulkload/TargetTableRefFunctions.java       |  22 +--
 4 files changed, 38 insertions(+), 281 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/c083bc82/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
index ab2848f..39ee4b1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
@@ -41,7 +41,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -269,7 +268,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
 
         job.setInputFormatClass(TextInputFormat.class);
         job.setMapOutputKeyClass(TableRowkeyPair.class);
-        job.setMapOutputValueClass(ImmutableBytesWritable.class);
+        job.setMapOutputValueClass(KeyValue.class);
         job.setOutputKeyClass(TableRowkeyPair.class);
         job.setOutputValueClass(KeyValue.class);
         job.setReducerClass(FormatToKeyValueReducer.class);
@@ -277,10 +276,7 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
         MultiHfileOutputFormat.configureIncrementalLoad(job, tablesToBeLoaded);
 
         final String tableNamesAsJson = TargetTableRefFunctions.NAMES_TO_JSON.apply(tablesToBeLoaded);
-        final String logicalNamesAsJson = TargetTableRefFunctions.LOGICAN_NAMES_TO_JSON.apply(tablesToBeLoaded);
-
         job.getConfiguration().set(FormatToKeyValueMapper.TABLE_NAMES_CONFKEY,tableNamesAsJson);
-        job.getConfiguration().set(FormatToKeyValueMapper.LOGICAL_NAMES_CONFKEY,logicalNamesAsJson);
 
         // give subclasses their hook
         setupJob(job);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c083bc82/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java
index 95b099e..7e115e5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java
@@ -17,30 +17,30 @@
  */
 package org.apache.phoenix.mapreduce;
 
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.sql.SQLException;
-import java.util.*;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 import javax.annotation.Nullable;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
 import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
-import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.schema.PColumn;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.util.*;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.UpsertExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,7 +59,7 @@ import com.google.common.collect.Lists;
  * to retrieve {@link ColumnInfo} from the target table.
  */
 public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable, Text, TableRowkeyPair,
-        ImmutableBytesWritable> {
+        KeyValue> {
 
     protected static final Logger LOG = LoggerFactory.getLogger(FormatToKeyValueMapper.class);
 
@@ -79,7 +79,6 @@ public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable
 
     /** Configuration key for the table names */
     public static final String TABLE_NAMES_CONFKEY = "phoenix.mapreduce.import.tablenames";
-    public static final String LOGICAL_NAMES_CONFKEY = "phoenix.mapreduce.import.logicalnames";
 
     /** Configuration key for the table configurations */
     public static final String TABLE_CONFIG_CONFKEY = "phoenix.mapreduce.import.table.config";
@@ -95,14 +94,8 @@ public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable
     protected UpsertExecutor<RECORD, ?> upsertExecutor;
     protected ImportPreUpsertKeyValueProcessor preUpdateProcessor;
     protected List<String> tableNames;
-    protected List<String> logicalNames;
     protected MapperUpsertListener<RECORD> upsertListener;
 
-    /*
-    lookup table for column index. Index in the List matches to the index in tableNames List
-     */
-    protected List<Map<byte[], Map<byte[], Integer>>> columnIndexes;
-
     protected abstract UpsertExecutor<RECORD,?> buildUpsertExecutor(Configuration conf);
     protected abstract LineParser<RECORD> getLineParser();
 
@@ -119,17 +112,13 @@ public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable
 
         try {
             conn = (PhoenixConnection) QueryUtil.getConnection(clientInfos, conf);
-
-            final String tableNamesConf = conf.get(TABLE_NAMES_CONFKEY);
-            final String logicalNamesConf = conf.get(LOGICAL_NAMES_CONFKEY);
-            tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
-            logicalNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf);
-
-            columnIndexes = initColumnIndexes();
         } catch (SQLException | ClassNotFoundException e) {
             throw new RuntimeException(e);
         }
 
+        final String tableNamesConf = conf.get(TABLE_NAMES_CONFKEY);
+        tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
+
         upsertListener = new MapperUpsertListener<RECORD>(
                 context, conf.getBoolean(IGNORE_INVALID_ROW_CONFKEY, true));
         upsertExecutor = buildUpsertExecutor(conf);
@@ -138,8 +127,7 @@ public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable
 
     @SuppressWarnings("deprecation")
     @Override
-    protected void map(LongWritable key, Text value, Context context) throws IOException,
-            InterruptedException {
+    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
         if (conn == null) {
             throw new RuntimeException("Connection not initialized.");
         }
@@ -157,7 +145,7 @@ public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable
                 return;
             }
             upsertExecutor.execute(ImmutableList.<RECORD>of(record));
-            Map<Integer, List<KeyValue>> map = new HashMap<>();
+
             Iterator<Pair<byte[], List<KeyValue>>> uncommittedDataIterator
                     = PhoenixRuntime.getUncommittedDataIterator(conn, true);
             while (uncommittedDataIterator.hasNext()) {
@@ -165,125 +153,24 @@ public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable
                 List<KeyValue> keyValueList = kvPair.getSecond();
                 keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), keyValueList);
                 byte[] first = kvPair.getFirst();
-                // Create a list of KV for each table
-                for (int i = 0; i < tableNames.size(); i++) {
-                    if (Bytes.compareTo(Bytes.toBytes(tableNames.get(i)), first) == 0) {
-                        if (!map.containsKey(i)) {
-                            map.put(i, new ArrayList<KeyValue>());
-                        }
-                        List<KeyValue> list = map.get(i);
-                        for (KeyValue kv : keyValueList) {
-                            list.add(kv);
-                        }
-                        break;
+                for (String tableName : tableNames) {
+                    if (Bytes.compareTo(Bytes.toBytes(tableName), first) != 0) {
+                        // skip edits for other tables
+                        continue;
+                    }
+                    for (KeyValue kv : keyValueList) {
+                        ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
+                        outputKey.set(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength());
+                        context.write(new TableRowkeyPair(tableName, outputKey), kv);
                     }
                 }
             }
-            for(Map.Entry<Integer, List<KeyValue>> rowEntry : map.entrySet()) {
-                int tableIndex = rowEntry.getKey();
-                List<KeyValue> lkv = rowEntry.getValue();
-                // All KV values combines to a single byte array
-                writeAggregatedRow(context, tableIndex, lkv);
-            }
             conn.rollback();
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
     }
 
-    private List<Map<byte[],Map<byte[], Integer>>> initColumnIndexes() throws SQLException {
-        List<Map<byte[], Map<byte[], Integer>>> tableMap = new ArrayList<>();
-        int tableIndex;
-        for (tableIndex = 0; tableIndex < tableNames.size(); tableIndex++) {
-            PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(tableIndex));
-            Map<byte[], Map<byte[], Integer>> columnMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-            List<PColumn> cls = table.getColumns();
-            for(int i = 0; i < cls.size(); i++) {
-                PColumn c = cls.get(i);
-                if(c.getFamilyName() == null) continue; // Skip PK column
-                byte[] family = c.getFamilyName().getBytes();
-                byte[] name = c.getName().getBytes();
-                if(!columnMap.containsKey(family)) {
-                    columnMap.put(family, new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR));
-                }
-                Map<byte[], Integer> qualifier = columnMap.get(family);
-                qualifier.put(name, i);
-            }
-            tableMap.add(columnMap);
-        }
-        return tableMap;
-    }
-
-    /**
-     * Find the column index which will replace the column name in
-     * the aggregated array and will be restored in Reducer
-     *
-     * @param tableIndex Table index in tableNames list
-     * @param cell KeyValue for the column
-     * @return column index for the specified cell or -1 if was not found
-     */
-    private int findIndex(int tableIndex, Cell cell) {
-        Map<byte[], Map<byte[], Integer>> columnMap = columnIndexes.get(tableIndex);
-        Map<byte[], Integer> qualifiers = columnMap.get(Bytes.copy(cell.getFamilyArray(),
-                cell.getFamilyOffset(), cell.getFamilyLength()));
-        if(qualifiers!= null) {
-            Integer result = qualifiers.get(Bytes.copy(cell.getQualifierArray(),
-                    cell.getQualifierOffset(), cell.getQualifierLength()));
-            if(result!=null) {
-                return result;
-            }
-        }
-        return -1;
-    }
-
-    /**
-     * Collect all column values for the same rowKey
-     *
-     * @param context Current mapper context
-     * @param tableIndex Table index in tableNames list
-     * @param lkv List of KV values that will be combined in a single ImmutableBytesWritable
-     * @throws IOException
-     * @throws InterruptedException
-     */
-
-    private void writeAggregatedRow(Context context, int tableIndex, List<KeyValue> lkv)
-            throws IOException, InterruptedException {
-        TrustedByteArrayOutputStream bos = new TrustedByteArrayOutputStream(1024);
-        DataOutputStream outputStream = new DataOutputStream(bos);
-        ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
-        if (!lkv.isEmpty()) {
-            // All Key Values for the same row are supposed to be the same, so init rowKey only once
-            Cell first = lkv.get(0);
-            outputKey.set(first.getRowArray(), first.getRowOffset(), first.getRowLength());
-            for (KeyValue cell : lkv) {
-                if(isEmptyCell(cell)) {
-                    continue;
-                }
-                int i = findIndex(tableIndex, cell);
-                if (i == -1) {
-                    throw new IOException("No column found for KeyValue");
-                }
-                WritableUtils.writeVInt(outputStream, i);
-                outputStream.writeByte(cell.getTypeByte());
-                WritableUtils.writeVInt(outputStream, cell.getValueLength());
-                outputStream.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
-            }
-        }
-        ImmutableBytesWritable aggregatedArray = new ImmutableBytesWritable(bos.toByteArray());
-        outputStream.close();
-        context.write(new TableRowkeyPair(Integer.toString(tableIndex), outputKey), aggregatedArray);
-    }
-
-    protected boolean isEmptyCell(KeyValue cell) {
-        if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(),
-                cell.getQualifierLength(), QueryConstants.EMPTY_COLUMN_BYTES, 0,
-                QueryConstants.EMPTY_COLUMN_BYTES.length) != 0)
-            return false;
-        else
-            return true;
-    }
-
-
     @Override
     protected void cleanup(Context context) throws IOException, InterruptedException {
         try {
@@ -336,12 +223,11 @@ public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable
     @VisibleForTesting
     static class MapperUpsertListener<T> implements UpsertExecutor.UpsertListener<T> {
 
-        private final Mapper<LongWritable, Text,
-                TableRowkeyPair, ImmutableBytesWritable>.Context context;
+        private final Mapper<LongWritable, Text, TableRowkeyPair, KeyValue>.Context context;
         private final boolean ignoreRecordErrors;
 
         private MapperUpsertListener(
-                Mapper<LongWritable, Text, TableRowkeyPair, ImmutableBytesWritable>.Context context,
+                Mapper<LongWritable, Text, TableRowkeyPair, KeyValue>.Context context,
                 boolean ignoreRecordErrors) {
             this.context = context;
             this.ignoreRecordErrors = ignoreRecordErrors;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c083bc82/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
index 0f90e45..5d00656 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
@@ -17,143 +17,36 @@
  */
 package org.apache.phoenix.mapreduce;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
 import java.io.IOException;
-import java.sql.SQLException;
-import java.util.*;
+import java.util.TreeSet;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
-import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
-import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
-import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
-import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.schema.PColumn;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.util.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Reducer class for the bulkload jobs.
  * Performs similar functionality to {@link KeyValueSortReducer}
  */
 public class FormatToKeyValueReducer
-    extends Reducer<TableRowkeyPair,ImmutableBytesWritable,TableRowkeyPair,KeyValue> {
-
-    protected static final Logger LOG = LoggerFactory.getLogger(FormatToKeyValueReducer.class);
-
-
-    protected List<String> tableNames;
-    protected List<String> logicalNames;
-    protected KeyValueBuilder builder;
-    List<List<Pair<byte[], byte[]>>> columnIndexes;
-    List<ImmutableBytesPtr> emptyFamilyName;
-
-
-    @Override
-    protected void setup(Context context) throws IOException, InterruptedException {
-        Configuration conf = context.getConfiguration();
-
-        // pass client configuration into driver
-        Properties clientInfos = new Properties();
-        for (Map.Entry<String, String> entry : conf) {
-            clientInfos.setProperty(entry.getKey(), entry.getValue());
-        }
-
-        try {
-            PhoenixConnection conn = (PhoenixConnection) QueryUtil.getConnection(clientInfos, conf);
-            builder = conn.getKeyValueBuilder();
-            final String tableNamesConf = conf.get(FormatToKeyValueMapper.TABLE_NAMES_CONFKEY);
-            final String logicalNamesConf = conf.get(FormatToKeyValueMapper.LOGICAL_NAMES_CONFKEY);
-            tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
-            logicalNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf);
-
-            columnIndexes = new ArrayList<>(tableNames.size());
-            emptyFamilyName = new ArrayList<>();
-            initColumnsMap(conn);
-        } catch (SQLException | ClassNotFoundException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private void initColumnsMap(PhoenixConnection conn) throws SQLException {
-        for (String tableName : logicalNames) {
-            PTable table = PhoenixRuntime.getTable(conn, tableName);
-            emptyFamilyName.add(SchemaUtil.getEmptyColumnFamilyPtr(table));
-            List<PColumn> cls = table.getColumns();
-            List<Pair<byte[], byte[]>> list = new ArrayList(cls.size());
-            for(int i = 0; i < cls.size(); i++) {
-                PColumn c = cls.get(i);
-                if(c.getFamilyName() == null) {
-                    list.add(null); // Skip PK column
-                    continue;
-                }
-                byte[] family = c.getFamilyName().getBytes();
-                byte[] name = c.getName().getBytes();
-                list.add(new Pair(family, name));
-            }
-            columnIndexes.add(list);
-        }
-
-    }
+    extends Reducer<TableRowkeyPair,KeyValue,TableRowkeyPair,KeyValue> {
 
     @Override
-    protected void reduce(TableRowkeyPair key, Iterable<ImmutableBytesWritable> values,
-        Reducer<TableRowkeyPair, ImmutableBytesWritable, TableRowkeyPair, KeyValue>.Context context)
+    protected void reduce(TableRowkeyPair key, Iterable<KeyValue> values,
+        Reducer<TableRowkeyPair, KeyValue, TableRowkeyPair, KeyValue>.Context context)
         throws IOException, InterruptedException {
         TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
-        int tableIndex = Integer.parseInt(key.getTableName());
-        key.setTableName(tableNames.get(tableIndex));
-        List<Pair<byte[], byte[]>> columns = columnIndexes.get(tableIndex);
-        for (ImmutableBytesWritable aggregatedArray : values) {
-            DataInputStream input = new DataInputStream(new ByteArrayInputStream(aggregatedArray.get()));
-            while (input.available()!= 0) {
-                int index = WritableUtils.readVInt(input);
-                Pair<byte[], byte[]> pair = columns.get(index);
-                byte type = input.readByte();
-                ImmutableBytesWritable value = null;
-                int len = WritableUtils.readVInt(input);
-                if (len > 0) {
-                    byte[] array = new byte[len];
-                    input.read(array);
-                    value = new ImmutableBytesWritable(array);
-                }
-                KeyValue kv;
-                KeyValue.Type kvType = KeyValue.Type.codeToType(type);
-                switch (kvType) {
-                    case Put: // not null value
-                        kv = builder.buildPut(key.getRowkey(),
-                                new ImmutableBytesWritable(pair.getFirst()),
-                                new ImmutableBytesWritable(pair.getSecond()), value);
-                        break;
-                    case DeleteColumn: // null value
-                        kv = builder.buildDeleteColumns(key.getRowkey(),
-                                new ImmutableBytesWritable(pair.getFirst()),
-                                new ImmutableBytesWritable(pair.getSecond()));
-                        break;
-                    default:
-                        throw new IOException("Unsupported KeyValue type " + kvType);
-                }
-                map.add(kv);
+        for (KeyValue kv: values) {
+            try {
+                map.add(kv.clone());
+            } catch (CloneNotSupportedException e) {
+                throw new java.io.IOException(e);
             }
-            KeyValue empty = builder.buildPut(key.getRowkey(),
-                    emptyFamilyName.get(tableIndex),
-                    QueryConstants.EMPTY_COLUMN_BYTES_PTR, ByteUtil.EMPTY_BYTE_ARRAY_PTR);
-            map.add(empty);
-            Closeables.closeQuietly(input);
         }
         context.setStatus("Read " + map.getClass());
         int index = 0;
-        for (KeyValue kv : map) {
+        for (KeyValue kv: map) {
             context.write(key, kv);
             if (++index % 100 == 0) context.setStatus("Wrote " + index);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/c083bc82/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java
index e02065f..d786842 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java
@@ -78,25 +78,7 @@ public class TargetTableRefFunctions {
          }
      };
 
-    public static Function<List<TargetTableRef>,String> LOGICAN_NAMES_TO_JSON =  new Function<List<TargetTableRef>,String>() {
-
-        @Override
-        public String apply(List<TargetTableRef> input) {
-            try {
-                List<String> tableNames = Lists.newArrayListWithCapacity(input.size());
-                for(TargetTableRef table : input) {
-                    tableNames.add(table.getLogicalName());
-                }
-                ObjectMapper mapper = new ObjectMapper();
-                return mapper.writeValueAsString(tableNames);
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-
-        }
-    };
-
-    public static Function<String,List<String>> NAMES_FROM_JSON =  new Function<String,List<String>>() {
+     public static Function<String,List<String>> NAMES_FROM_JSON =  new Function<String,List<String>>() {
 
          @SuppressWarnings("unchecked")
          @Override
@@ -110,4 +92,4 @@ public class TargetTableRefFunctions {
 
          }
      };
-}
+ }