You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2016/03/29 21:50:43 UTC

[10/30] phoenix git commit: PHOENIX-2733 Minor cleanup for improvements to CSV Bulk Loader performance (Sergey Soldatov)

PHOENIX-2733 Minor cleanup for improvements to CSV Bulk Loader performance (Sergey Soldatov)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/952a01cc
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/952a01cc
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/952a01cc

Branch: refs/heads/calcite
Commit: 952a01cce6d7d314ca26e89a398e75f38690cdad
Parents: 1d03e02
Author: James Taylor <jt...@salesforce.com>
Authored: Wed Mar 2 13:54:05 2016 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Mar 2 13:54:24 2016 -0800

----------------------------------------------------------------------
 .../phoenix/mapreduce/AbstractBulkLoadTool.java |  12 +-
 .../phoenix/mapreduce/CsvToKeyValueMapper.java  |   2 +-
 .../mapreduce/FormatToBytesWritableMapper.java  | 385 +++++++++++++++++++
 .../mapreduce/FormatToKeyValueMapper.java       | 378 ------------------
 .../mapreduce/FormatToKeyValueReducer.java      |  28 +-
 .../phoenix/mapreduce/JsonToKeyValueMapper.java |   2 +-
 .../bulkload/TargetTableRefFunctions.java       |   2 +-
 .../util/PhoenixConfigurationUtil.java          |   4 +-
 .../FormatToBytesWritableMapperTest.java        | 102 +++++
 .../mapreduce/FormatToKeyValueMapperTest.java   | 102 -----
 10 files changed, 516 insertions(+), 501 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/952a01cc/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
index ab2848f..ff73530 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
@@ -209,10 +209,10 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
         List<ColumnInfo> importColumns = buildImportColumns(conn, cmdLine, qualifiedTableName);
         Preconditions.checkNotNull(importColumns);
         Preconditions.checkArgument(!importColumns.isEmpty(), "Column info list is empty");
-        FormatToKeyValueMapper.configureColumnInfoList(conf, importColumns);
+        FormatToBytesWritableMapper.configureColumnInfoList(conf, importColumns);
         boolean ignoreInvalidRows = cmdLine.hasOption(IGNORE_ERRORS_OPT.getOpt());
-        conf.setBoolean(FormatToKeyValueMapper.IGNORE_INVALID_ROW_CONFKEY, ignoreInvalidRows);
-        conf.set(FormatToKeyValueMapper.TABLE_NAME_CONFKEY, qualifiedTableName);
+        conf.setBoolean(FormatToBytesWritableMapper.IGNORE_INVALID_ROW_CONFKEY, ignoreInvalidRows);
+        conf.set(FormatToBytesWritableMapper.TABLE_NAME_CONFKEY, qualifiedTableName);
 
         // give subclasses their hook
         configureOptions(cmdLine, importColumns, conf);
@@ -277,10 +277,10 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
         MultiHfileOutputFormat.configureIncrementalLoad(job, tablesToBeLoaded);
 
         final String tableNamesAsJson = TargetTableRefFunctions.NAMES_TO_JSON.apply(tablesToBeLoaded);
-        final String logicalNamesAsJson = TargetTableRefFunctions.LOGICAN_NAMES_TO_JSON.apply(tablesToBeLoaded);
+        final String logicalNamesAsJson = TargetTableRefFunctions.LOGICAL_NAMES_TO_JSON.apply(tablesToBeLoaded);
 
-        job.getConfiguration().set(FormatToKeyValueMapper.TABLE_NAMES_CONFKEY,tableNamesAsJson);
-        job.getConfiguration().set(FormatToKeyValueMapper.LOGICAL_NAMES_CONFKEY,logicalNamesAsJson);
+        job.getConfiguration().set(FormatToBytesWritableMapper.TABLE_NAMES_CONFKEY,tableNamesAsJson);
+        job.getConfiguration().set(FormatToBytesWritableMapper.LOGICAL_NAMES_CONFKEY,logicalNamesAsJson);
 
         // give subclasses their hook
         setupJob(job);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/952a01cc/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
index 2cb1ac7..e5a6e5b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
@@ -41,7 +41,7 @@ import com.google.common.collect.Iterables;
  * extracting the created KeyValues and rolling back the statement execution before it is
  * committed to HBase.
  */
-public class CsvToKeyValueMapper extends FormatToKeyValueMapper<CSVRecord> {
+public class CsvToKeyValueMapper extends FormatToBytesWritableMapper<CSVRecord> {
 
     /** Configuration key for the field delimiter for input csv records */
     public static final String FIELD_DELIMITER_CONFKEY = "phoenix.mapreduce.import.fielddelimiter";

http://git-wip-us.apache.org/repos/asf/phoenix/blob/952a01cc/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
new file mode 100644
index 0000000..2c9b6d9
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TreeMap;
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
+import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.UpsertExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+/**
+ * Base class for converting some input source format into {@link ImmutableBytesWritable}s that
+ * contains packed in a single byte array values for all columns.
+ * Assumes input format is text-based, with one row per line. Depends on an online cluster
+ * to retrieve {@link ColumnInfo} from the target table.
+ */
+public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWritable, Text, TableRowkeyPair,
+        ImmutableBytesWritable> {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(FormatToBytesWritableMapper.class);
+
+    protected static final String COUNTER_GROUP_NAME = "Phoenix MapReduce Import";
+
+    /** Configuration key for the name of the output table */
+    public static final String TABLE_NAME_CONFKEY = "phoenix.mapreduce.import.tablename";
+
+    /** Configuration key for the columns to be imported */
+    public static final String COLUMN_INFO_CONFKEY = "phoenix.mapreduce.import.columninfos";
+
+    /** Configuration key for the flag to ignore invalid rows */
+    public static final String IGNORE_INVALID_ROW_CONFKEY = "phoenix.mapreduce.import.ignoreinvalidrow";
+
+    /** Configuration key for the table names */
+    public static final String TABLE_NAMES_CONFKEY = "phoenix.mapreduce.import.tablenames";
+
+    /** Configuration key for the table logical names */
+    public static final String LOGICAL_NAMES_CONFKEY = "phoenix.mapreduce.import.logicalnames";
+
+    /**
+     * Parses a single input line, returning a {@code T}.
+     */
+    public interface LineParser<T> {
+        T parse(String input) throws IOException;
+    }
+
+    protected PhoenixConnection conn;
+    protected UpsertExecutor<RECORD, ?> upsertExecutor;
+    protected ImportPreUpsertKeyValueProcessor preUpdateProcessor;
+    protected List<String> tableNames;
+    protected List<String> logicalNames;
+    protected MapperUpsertListener<RECORD> upsertListener;
+
+    /*
+    lookup table for column index. Index in the List matches to the index in tableNames List
+     */
+    protected List<Map<byte[], Map<byte[], Integer>>> columnIndexes;
+
+    protected abstract UpsertExecutor<RECORD,?> buildUpsertExecutor(Configuration conf);
+    protected abstract LineParser<RECORD> getLineParser();
+
+    @Override
+    protected void setup(Context context) throws IOException, InterruptedException {
+
+        Configuration conf = context.getConfiguration();
+
+        // pass client configuration into driver
+        Properties clientInfos = new Properties();
+        for (Map.Entry<String, String> entry : conf) {
+            clientInfos.setProperty(entry.getKey(), entry.getValue());
+        }
+
+        try {
+            conn = (PhoenixConnection) QueryUtil.getConnection(clientInfos, conf);
+
+            final String tableNamesConf = conf.get(TABLE_NAMES_CONFKEY);
+            final String logicalNamesConf = conf.get(LOGICAL_NAMES_CONFKEY);
+            tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
+            logicalNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf);
+
+            columnIndexes = initColumnIndexes();
+        } catch (SQLException | ClassNotFoundException e) {
+            throw new RuntimeException(e);
+        }
+
+        upsertListener = new MapperUpsertListener<RECORD>(
+                context, conf.getBoolean(IGNORE_INVALID_ROW_CONFKEY, true));
+        upsertExecutor = buildUpsertExecutor(conf);
+        preUpdateProcessor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    protected void map(LongWritable key, Text value, Context context) throws IOException,
+            InterruptedException {
+        if (conn == null) {
+            throw new RuntimeException("Connection not initialized.");
+        }
+        try {
+            RECORD record = null;
+            try {
+                record = getLineParser().parse(value.toString());
+            } catch (IOException e) {
+                context.getCounter(COUNTER_GROUP_NAME, "Parser errors").increment(1L);
+                return;
+            }
+
+            if (record == null) {
+                context.getCounter(COUNTER_GROUP_NAME, "Empty records").increment(1L);
+                return;
+            }
+            upsertExecutor.execute(ImmutableList.<RECORD>of(record));
+            Map<Integer, List<KeyValue>> map = new HashMap<>();
+            Iterator<Pair<byte[], List<KeyValue>>> uncommittedDataIterator
+                    = PhoenixRuntime.getUncommittedDataIterator(conn, true);
+            while (uncommittedDataIterator.hasNext()) {
+                Pair<byte[], List<KeyValue>> kvPair = uncommittedDataIterator.next();
+                List<KeyValue> keyValueList = kvPair.getSecond();
+                keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), keyValueList);
+                byte[] first = kvPair.getFirst();
+                // Create a list of KV for each table
+                for (int i = 0; i < tableNames.size(); i++) {
+                    if (Bytes.compareTo(Bytes.toBytes(tableNames.get(i)), first) == 0) {
+                        if (!map.containsKey(i)) {
+                            map.put(i, new ArrayList<KeyValue>());
+                        }
+                        List<KeyValue> list = map.get(i);
+                        for (KeyValue kv : keyValueList) {
+                            list.add(kv);
+                        }
+                        break;
+                    }
+                }
+            }
+            for (Map.Entry<Integer, List<KeyValue>> rowEntry : map.entrySet()) {
+                int tableIndex = rowEntry.getKey();
+                List<KeyValue> lkv = rowEntry.getValue();
+                // All KV values combines to a single byte array
+                writeAggregatedRow(context, tableIndex, lkv);
+            }
+            conn.rollback();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private List<Map<byte[], Map<byte[], Integer>>> initColumnIndexes() throws SQLException {
+        List<Map<byte[], Map<byte[], Integer>>> tableMap = new ArrayList<>();
+        int tableIndex;
+        for (tableIndex = 0; tableIndex < tableNames.size(); tableIndex++) {
+            PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(tableIndex));
+            Map<byte[], Map<byte[], Integer>> columnMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+            List<PColumn> cls = table.getColumns();
+            for (int i = 0; i < cls.size(); i++) {
+                PColumn c = cls.get(i);
+                if (c.getFamilyName() == null) continue; // Skip PK column
+                byte[] family = c.getFamilyName().getBytes();
+                byte[] name = c.getName().getBytes();
+                if (!columnMap.containsKey(family)) {
+                    columnMap.put(family, new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR));
+                }
+                Map<byte[], Integer> qualifier = columnMap.get(family);
+                qualifier.put(name, i);
+            }
+            tableMap.add(columnMap);
+        }
+        return tableMap;
+    }
+
+    /**
+     * Find the column index which will replace the column name in
+     * the aggregated array and will be restored in Reducer
+     *
+     * @param tableIndex Table index in tableNames list
+     * @param cell       KeyValue for the column
+     * @return column index for the specified cell or -1 if was not found
+     */
+    private int findIndex(int tableIndex, Cell cell) {
+        Map<byte[], Map<byte[], Integer>> columnMap = columnIndexes.get(tableIndex);
+        Map<byte[], Integer> qualifiers = columnMap.get(Bytes.copy(cell.getFamilyArray(),
+                cell.getFamilyOffset(), cell.getFamilyLength()));
+        if (qualifiers != null) {
+            Integer result = qualifiers.get(Bytes.copy(cell.getQualifierArray(),
+                    cell.getQualifierOffset(), cell.getQualifierLength()));
+            if (result != null) {
+                return result;
+            }
+        }
+        return -1;
+    }
+
+    /**
+     * Collect all column values for the same rowKey
+     *
+     * @param context    Current mapper context
+     * @param tableIndex Table index in tableNames list
+     * @param lkv        List of KV values that will be combined in a single ImmutableBytesWritable
+     * @throws IOException
+     * @throws InterruptedException
+     */
+
+    private void writeAggregatedRow(Context context, int tableIndex, List<KeyValue> lkv)
+            throws IOException, InterruptedException {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
+        DataOutputStream outputStream = new DataOutputStream(bos);
+        ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
+        if (!lkv.isEmpty()) {
+            // All Key Values for the same row are supposed to be the same, so init rowKey only once
+            Cell first = lkv.get(0);
+            outputKey.set(first.getRowArray(), first.getRowOffset(), first.getRowLength());
+            for (KeyValue cell : lkv) {
+                if (isEmptyCell(cell)) {
+                    continue;
+                }
+                int i = findIndex(tableIndex, cell);
+                if (i == -1) {
+                    throw new IOException("No column found for KeyValue");
+                }
+                WritableUtils.writeVInt(outputStream, i);
+                outputStream.writeByte(cell.getTypeByte());
+                WritableUtils.writeVInt(outputStream, cell.getValueLength());
+                outputStream.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+            }
+        }
+        ImmutableBytesWritable aggregatedArray = new ImmutableBytesWritable(bos.toByteArray());
+        outputStream.close();
+        context.write(new TableRowkeyPair(tableNames.get(tableIndex), outputKey), aggregatedArray);
+    }
+
+    protected boolean isEmptyCell(KeyValue cell) {
+        if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(),
+                cell.getQualifierLength(), QueryConstants.EMPTY_COLUMN_BYTES, 0,
+                QueryConstants.EMPTY_COLUMN_BYTES.length) != 0)
+            return false;
+        else
+            return true;
+    }
+
+
+    @Override
+    protected void cleanup(Context context) throws IOException, InterruptedException {
+        try {
+            if (conn != null) {
+                conn.close();
+            }
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Write the list of to-import columns to a job configuration.
+     *
+     * @param conf           configuration to be written to
+     * @param columnInfoList list of ColumnInfo objects to be configured for import
+     */
+    @VisibleForTesting
+    static void configureColumnInfoList(Configuration conf, List<ColumnInfo> columnInfoList) {
+        conf.set(COLUMN_INFO_CONFKEY, Joiner.on("|").useForNull("").join(columnInfoList));
+    }
+
+    /**
+     * Build the list of ColumnInfos for the import based on information in the configuration.
+     */
+    @VisibleForTesting
+    static List<ColumnInfo> buildColumnInfoList(Configuration conf) {
+
+        return Lists.newArrayList(
+                Iterables.transform(
+                        Splitter.on("|").split(conf.get(COLUMN_INFO_CONFKEY)),
+                        new Function<String, ColumnInfo>() {
+                            @Nullable
+                            @Override
+                            public ColumnInfo apply(@Nullable String input) {
+                                if (input == null || input.isEmpty()) {
+                                    // An empty string represents a null that was passed in to
+                                    // the configuration, which corresponds to an input column
+                                    // which is to be skipped
+                                    return null;
+                                }
+                                return ColumnInfo.fromString(input);
+                            }
+                        }));
+    }
+
+    /**
+     * Listener that logs successful upserts and errors to job counters.
+     */
+    @VisibleForTesting
+    static class MapperUpsertListener<T> implements UpsertExecutor.UpsertListener<T> {
+
+        private final Mapper<LongWritable, Text,
+                TableRowkeyPair, ImmutableBytesWritable>.Context context;
+        private final boolean ignoreRecordErrors;
+
+        private MapperUpsertListener(
+                Mapper<LongWritable, Text, TableRowkeyPair, ImmutableBytesWritable>.Context context,
+                boolean ignoreRecordErrors) {
+            this.context = context;
+            this.ignoreRecordErrors = ignoreRecordErrors;
+        }
+
+        @Override
+        public void upsertDone(long upsertCount) {
+            context.getCounter(COUNTER_GROUP_NAME, "Upserts Done").increment(1L);
+        }
+
+        @Override
+        public void errorOnRecord(T record, Throwable throwable) {
+            LOG.error("Error on record " + record, throwable);
+            context.getCounter(COUNTER_GROUP_NAME, "Errors on records").increment(1L);
+            if (!ignoreRecordErrors) {
+                throw Throwables.propagate(throwable);
+            }
+        }
+    }
+
+    /**
+     * A default implementation of {@code ImportPreUpsertKeyValueProcessor} that is used if no
+     * specific class is configured. This implementation simply passes through the KeyValue
+     * list that is passed in.
+     */
+    public static class DefaultImportPreUpsertKeyValueProcessor implements
+            ImportPreUpsertKeyValueProcessor {
+
+        @Override
+        public List<KeyValue> preUpsert(byte[] rowKey, List<KeyValue> keyValues) {
+            return keyValues;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/952a01cc/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java
deleted file mode 100644
index 8dbb4aa..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java
+++ /dev/null
@@ -1,378 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.mapreduce;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.sql.SQLException;
-import java.util.*;
-import javax.annotation.Nullable;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
-import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
-import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
-import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.schema.PColumn;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.util.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-/**
- * Base class for converting some input source format into {@link KeyValue}s of a target
- * schema. Assumes input format is text-based, with one row per line. Depends on an online cluster
- * to retrieve {@link ColumnInfo} from the target table.
- */
-public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable, Text, TableRowkeyPair,
-        ImmutableBytesWritable> {
-
-    protected static final Logger LOG = LoggerFactory.getLogger(FormatToKeyValueMapper.class);
-
-    protected static final String COUNTER_GROUP_NAME = "Phoenix MapReduce Import";
-
-    /** Configuration key for the name of the output table */
-    public static final String TABLE_NAME_CONFKEY = "phoenix.mapreduce.import.tablename";
-
-    /** Configuration key for the name of the output index table */
-    public static final String INDEX_TABLE_NAME_CONFKEY = "phoenix.mapreduce.import.indextablename";
-
-    /** Configuration key for the columns to be imported */
-    public static final String COLUMN_INFO_CONFKEY = "phoenix.mapreduce.import.columninfos";
-
-    /** Configuration key for the flag to ignore invalid rows */
-    public static final String IGNORE_INVALID_ROW_CONFKEY = "phoenix.mapreduce.import.ignoreinvalidrow";
-
-    /** Configuration key for the table names */
-    public static final String TABLE_NAMES_CONFKEY = "phoenix.mapreduce.import.tablenames";
-    public static final String LOGICAL_NAMES_CONFKEY = "phoenix.mapreduce.import.logicalnames";
-
-    /** Configuration key for the table configurations */
-    public static final String TABLE_CONFIG_CONFKEY = "phoenix.mapreduce.import.table.config";
-
-    /**
-     * Parses a single input line, returning a {@code T}.
-     */
-    public interface LineParser<T> {
-        T parse(String input) throws IOException;
-    }
-
-    protected PhoenixConnection conn;
-    protected UpsertExecutor<RECORD, ?> upsertExecutor;
-    protected ImportPreUpsertKeyValueProcessor preUpdateProcessor;
-    protected List<String> tableNames;
-    protected List<String> logicalNames;
-    protected MapperUpsertListener<RECORD> upsertListener;
-
-    /*
-    lookup table for column index. Index in the List matches to the index in tableNames List
-     */
-    protected List<Map<byte[], Map<byte[], Integer>>> columnIndexes;
-
-    protected abstract UpsertExecutor<RECORD,?> buildUpsertExecutor(Configuration conf);
-    protected abstract LineParser<RECORD> getLineParser();
-
-    @Override
-    protected void setup(Context context) throws IOException, InterruptedException {
-
-        Configuration conf = context.getConfiguration();
-
-        // pass client configuration into driver
-        Properties clientInfos = new Properties();
-        for (Map.Entry<String, String> entry : conf) {
-            clientInfos.setProperty(entry.getKey(), entry.getValue());
-        }
-
-        try {
-            conn = (PhoenixConnection) QueryUtil.getConnection(clientInfos, conf);
-
-            final String tableNamesConf = conf.get(TABLE_NAMES_CONFKEY);
-            final String logicalNamesConf = conf.get(LOGICAL_NAMES_CONFKEY);
-            tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
-            logicalNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf);
-
-            columnIndexes = initColumnIndexes();
-        } catch (SQLException | ClassNotFoundException e) {
-            throw new RuntimeException(e);
-        }
-
-        upsertListener = new MapperUpsertListener<RECORD>(
-                context, conf.getBoolean(IGNORE_INVALID_ROW_CONFKEY, true));
-        upsertExecutor = buildUpsertExecutor(conf);
-        preUpdateProcessor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
-    }
-
-    @SuppressWarnings("deprecation")
-    @Override
-    protected void map(LongWritable key, Text value, Context context) throws IOException,
-            InterruptedException {
-        if (conn == null) {
-            throw new RuntimeException("Connection not initialized.");
-        }
-        try {
-            RECORD record = null;
-            try {
-                record = getLineParser().parse(value.toString());
-            } catch (IOException e) {
-                context.getCounter(COUNTER_GROUP_NAME, "Parser errors").increment(1L);
-                return;
-            }
-
-            if (record == null) {
-                context.getCounter(COUNTER_GROUP_NAME, "Empty records").increment(1L);
-                return;
-            }
-            upsertExecutor.execute(ImmutableList.<RECORD>of(record));
-            Map<Integer, List<KeyValue>> map = new HashMap<>();
-            Iterator<Pair<byte[], List<KeyValue>>> uncommittedDataIterator
-                    = PhoenixRuntime.getUncommittedDataIterator(conn, true);
-            while (uncommittedDataIterator.hasNext()) {
-                Pair<byte[], List<KeyValue>> kvPair = uncommittedDataIterator.next();
-                List<KeyValue> keyValueList = kvPair.getSecond();
-                keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), keyValueList);
-                byte[] first = kvPair.getFirst();
-                // Create a list of KV for each table
-                for (int i = 0; i < tableNames.size(); i++) {
-                    if (Bytes.compareTo(Bytes.toBytes(tableNames.get(i)), first) == 0) {
-                        if (!map.containsKey(i)) {
-                            map.put(i, new ArrayList<KeyValue>());
-                        }
-                        List<KeyValue> list = map.get(i);
-                        for (KeyValue kv : keyValueList) {
-                            list.add(kv);
-                        }
-                        break;
-                    }
-                }
-            }
-            for(Map.Entry<Integer, List<KeyValue>> rowEntry : map.entrySet()) {
-                int tableIndex = rowEntry.getKey();
-                List<KeyValue> lkv = rowEntry.getValue();
-                // All KV values combines to a single byte array
-                writeAggregatedRow(context, tableIndex, lkv);
-            }
-            conn.rollback();
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private List<Map<byte[],Map<byte[], Integer>>> initColumnIndexes() throws SQLException {
-        List<Map<byte[], Map<byte[], Integer>>> tableMap = new ArrayList<>();
-        int tableIndex;
-        for (tableIndex = 0; tableIndex < tableNames.size(); tableIndex++) {
-            PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(tableIndex));
-            Map<byte[], Map<byte[], Integer>> columnMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-            List<PColumn> cls = table.getColumns();
-            for(int i = 0; i < cls.size(); i++) {
-                PColumn c = cls.get(i);
-                if(c.getFamilyName() == null) continue; // Skip PK column
-                byte[] family = c.getFamilyName().getBytes();
-                byte[] name = c.getName().getBytes();
-                if(!columnMap.containsKey(family)) {
-                    columnMap.put(family, new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR));
-                }
-                Map<byte[], Integer> qualifier = columnMap.get(family);
-                qualifier.put(name, i);
-            }
-            tableMap.add(columnMap);
-        }
-        return tableMap;
-    }
-
-    /**
-     * Find the column index which will replace the column name in
-     * the aggregated array and will be restored in Reducer
-     *
-     * @param tableIndex Table index in tableNames list
-     * @param cell KeyValue for the column
-     * @return column index for the specified cell or -1 if was not found
-     */
-    private int findIndex(int tableIndex, Cell cell) {
-        Map<byte[], Map<byte[], Integer>> columnMap = columnIndexes.get(tableIndex);
-        Map<byte[], Integer> qualifiers = columnMap.get(Bytes.copy(cell.getFamilyArray(),
-                cell.getFamilyOffset(), cell.getFamilyLength()));
-        if(qualifiers!= null) {
-            Integer result = qualifiers.get(Bytes.copy(cell.getQualifierArray(),
-                    cell.getQualifierOffset(), cell.getQualifierLength()));
-            if(result!=null) {
-                return result;
-            }
-        }
-        return -1;
-    }
-
-    /**
-     * Collect all column values for the same rowKey
-     *
-     * @param context Current mapper context
-     * @param tableIndex Table index in tableNames list
-     * @param lkv List of KV values that will be combined in a single ImmutableBytesWritable
-     * @throws IOException
-     * @throws InterruptedException
-     */
-
-    private void writeAggregatedRow(Context context, int tableIndex, List<KeyValue> lkv)
-            throws IOException, InterruptedException {
-        TrustedByteArrayOutputStream bos = new TrustedByteArrayOutputStream(1024);
-        DataOutputStream outputStream = new DataOutputStream(bos);
-        ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
-        if (!lkv.isEmpty()) {
-            // All Key Values for the same row are supposed to be the same, so init rowKey only once
-            Cell first = lkv.get(0);
-            outputKey.set(first.getRowArray(), first.getRowOffset(), first.getRowLength());
-            for (KeyValue cell : lkv) {
-                if(isEmptyCell(cell)) {
-                    continue;
-                }
-                int i = findIndex(tableIndex, cell);
-                if (i == -1) {
-                    throw new IOException("No column found for KeyValue");
-                }
-                WritableUtils.writeVInt(outputStream, i);
-                outputStream.writeByte(cell.getTypeByte());
-                WritableUtils.writeVInt(outputStream, cell.getValueLength());
-                outputStream.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
-            }
-        }
-        ImmutableBytesWritable aggregatedArray = new ImmutableBytesWritable(bos.toByteArray());
-        outputStream.close();
-        context.write(new TableRowkeyPair(tableNames.get(tableIndex), outputKey), aggregatedArray);
-    }
-
-    protected boolean isEmptyCell(KeyValue cell) {
-        if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(),
-                cell.getQualifierLength(), QueryConstants.EMPTY_COLUMN_BYTES, 0,
-                QueryConstants.EMPTY_COLUMN_BYTES.length) != 0)
-            return false;
-        else
-            return true;
-    }
-
-
-    @Override
-    protected void cleanup(Context context) throws IOException, InterruptedException {
-        try {
-            if (conn != null) {
-                conn.close();
-            }
-        } catch (SQLException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * Write the list of to-import columns to a job configuration.
-     *
-     * @param conf configuration to be written to
-     * @param columnInfoList list of ColumnInfo objects to be configured for import
-     */
-    @VisibleForTesting
-    static void configureColumnInfoList(Configuration conf, List<ColumnInfo> columnInfoList) {
-        conf.set(COLUMN_INFO_CONFKEY, Joiner.on("|").useForNull("").join(columnInfoList));
-    }
-
-    /**
-     * Build the list of ColumnInfos for the import based on information in the configuration.
-     */
-    @VisibleForTesting
-    static List<ColumnInfo> buildColumnInfoList(Configuration conf) {
-
-        return Lists.newArrayList(
-                Iterables.transform(
-                        Splitter.on("|").split(conf.get(COLUMN_INFO_CONFKEY)),
-                        new Function<String, ColumnInfo>() {
-                            @Nullable
-                            @Override
-                            public ColumnInfo apply(@Nullable String input) {
-                                if (input == null || input.isEmpty()) {
-                                    // An empty string represents a null that was passed in to
-                                    // the configuration, which corresponds to an input column
-                                    // which is to be skipped
-                                    return null;
-                                }
-                                return ColumnInfo.fromString(input);
-                            }
-                        }));
-    }
-
-    /**
-     * Listener that logs successful upserts and errors to job counters.
-     */
-    @VisibleForTesting
-    static class MapperUpsertListener<T> implements UpsertExecutor.UpsertListener<T> {
-
-        private final Mapper<LongWritable, Text,
-                TableRowkeyPair, ImmutableBytesWritable>.Context context;
-        private final boolean ignoreRecordErrors;
-
-        private MapperUpsertListener(
-                Mapper<LongWritable, Text, TableRowkeyPair, ImmutableBytesWritable>.Context context,
-                boolean ignoreRecordErrors) {
-            this.context = context;
-            this.ignoreRecordErrors = ignoreRecordErrors;
-        }
-
-        @Override
-        public void upsertDone(long upsertCount) {
-            context.getCounter(COUNTER_GROUP_NAME, "Upserts Done").increment(1L);
-        }
-
-        @Override
-        public void errorOnRecord(T record, Throwable throwable) {
-            LOG.error("Error on record " + record, throwable);
-            context.getCounter(COUNTER_GROUP_NAME, "Errors on records").increment(1L);
-            if (!ignoreRecordErrors) {
-                throw Throwables.propagate(throwable);
-            }
-        }
-    }
-
-    /**
-     * A default implementation of {@code ImportPreUpsertKeyValueProcessor} that is used if no
-     * specific class is configured. This implementation simply passes through the KeyValue
-     * list that is passed in.
-     */
-    public static class DefaultImportPreUpsertKeyValueProcessor implements
-            ImportPreUpsertKeyValueProcessor {
-
-        @Override
-        public List<KeyValue> preUpsert(byte[] rowKey, List<KeyValue> keyValues) {
-            return keyValues;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/952a01cc/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
index fb61855..e906431 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
@@ -21,7 +21,11 @@ import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.sql.SQLException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TreeSet;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.KeyValue;
@@ -38,7 +42,11 @@ import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.PColumn;
 import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.util.*;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.Closeables;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,7 +55,7 @@ import org.slf4j.LoggerFactory;
  * Performs similar functionality to {@link KeyValueSortReducer}
  */
 public class FormatToKeyValueReducer
-    extends Reducer<TableRowkeyPair,ImmutableBytesWritable,TableRowkeyPair,KeyValue> {
+        extends Reducer<TableRowkeyPair, ImmutableBytesWritable, TableRowkeyPair, KeyValue> {
 
     protected static final Logger LOG = LoggerFactory.getLogger(FormatToKeyValueReducer.class);
 
@@ -72,8 +80,8 @@ public class FormatToKeyValueReducer
         try {
             PhoenixConnection conn = (PhoenixConnection) QueryUtil.getConnection(clientInfos, conf);
             builder = conn.getKeyValueBuilder();
-            final String tableNamesConf = conf.get(FormatToKeyValueMapper.TABLE_NAMES_CONFKEY);
-            final String logicalNamesConf = conf.get(FormatToKeyValueMapper.LOGICAL_NAMES_CONFKEY);
+            final String tableNamesConf = conf.get(FormatToBytesWritableMapper.TABLE_NAMES_CONFKEY);
+            final String logicalNamesConf = conf.get(FormatToBytesWritableMapper.LOGICAL_NAMES_CONFKEY);
             tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
             logicalNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf);
 
@@ -91,9 +99,9 @@ public class FormatToKeyValueReducer
             emptyFamilyName.add(SchemaUtil.getEmptyColumnFamilyPtr(table));
             List<PColumn> cls = table.getColumns();
             List<Pair<byte[], byte[]>> list = new ArrayList(cls.size());
-            for(int i = 0; i < cls.size(); i++) {
+            for (int i = 0; i < cls.size(); i++) {
                 PColumn c = cls.get(i);
-                if(c.getFamilyName() == null) {
+                if (c.getFamilyName() == null) {
                     list.add(null); // Skip PK column
                     continue;
                 }
@@ -108,14 +116,14 @@ public class FormatToKeyValueReducer
 
     @Override
     protected void reduce(TableRowkeyPair key, Iterable<ImmutableBytesWritable> values,
-        Reducer<TableRowkeyPair, ImmutableBytesWritable, TableRowkeyPair, KeyValue>.Context context)
-        throws IOException, InterruptedException {
+                          Reducer<TableRowkeyPair, ImmutableBytesWritable, TableRowkeyPair, KeyValue>.Context context)
+            throws IOException, InterruptedException {
         TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
         int tableIndex = tableNames.indexOf(key.getTableName());
         List<Pair<byte[], byte[]>> columns = columnIndexes.get(tableIndex);
         for (ImmutableBytesWritable aggregatedArray : values) {
             DataInputStream input = new DataInputStream(new ByteArrayInputStream(aggregatedArray.get()));
-            while (input.available()!= 0) {
+            while (input.available() != 0) {
                 int index = WritableUtils.readVInt(input);
                 Pair<byte[], byte[]> pair = columns.get(index);
                 byte type = input.readByte();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/952a01cc/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonToKeyValueMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonToKeyValueMapper.java
index 5173a0e..2a51b2c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonToKeyValueMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonToKeyValueMapper.java
@@ -37,7 +37,7 @@ import com.google.common.base.Preconditions;
  * extracting the created KeyValues and rolling back the statement execution before it is
  * committed to HBase.
  */
-public class JsonToKeyValueMapper extends FormatToKeyValueMapper<Map<?, ?>> {
+public class JsonToKeyValueMapper extends FormatToBytesWritableMapper<Map<?, ?>> {
 
     private LineParser<Map<?, ?>> lineParser;
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/952a01cc/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java
index e02065f..58725c2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java
@@ -78,7 +78,7 @@ public class TargetTableRefFunctions {
          }
      };
 
-    public static Function<List<TargetTableRef>,String> LOGICAN_NAMES_TO_JSON =  new Function<List<TargetTableRef>,String>() {
+    public static Function<List<TargetTableRef>,String> LOGICAL_NAMES_TO_JSON =  new Function<List<TargetTableRef>,String>() {
 
         @Override
         public String apply(List<TargetTableRef> input) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/952a01cc/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 280daa2..b1879d1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -36,7 +36,7 @@ import org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable;
 import org.apache.hadoop.mapreduce.lib.db.DBWritable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.mapreduce.FormatToKeyValueMapper;
+import org.apache.phoenix.mapreduce.FormatToBytesWritableMapper;
 import org.apache.phoenix.mapreduce.ImportPreUpsertKeyValueProcessor;
 import org.apache.phoenix.mapreduce.PhoenixInputFormat;
 import org.apache.phoenix.util.ColumnInfo;
@@ -418,7 +418,7 @@ public final class PhoenixConfigurationUtil {
         Class<? extends ImportPreUpsertKeyValueProcessor> processorClass = null;
         try {
             processorClass = conf.getClass(
-                    UPSERT_HOOK_CLASS_CONFKEY, FormatToKeyValueMapper.DefaultImportPreUpsertKeyValueProcessor.class,
+                    UPSERT_HOOK_CLASS_CONFKEY, FormatToBytesWritableMapper.DefaultImportPreUpsertKeyValueProcessor.class,
                     ImportPreUpsertKeyValueProcessor.class);
         } catch (Exception e) {
             throw new IllegalStateException("Couldn't load upsert hook class", e);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/952a01cc/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapperTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapperTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapperTest.java
new file mode 100644
index 0000000..6424976
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapperTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.schema.types.PIntegerArray;
+import org.apache.phoenix.schema.types.PUnsignedInt;
+import org.apache.phoenix.util.ColumnInfo;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import static org.junit.Assert.assertEquals;
+
+public class FormatToBytesWritableMapperTest {
+
+    @Test
+    public void testBuildColumnInfoList() {
+        List<ColumnInfo> columnInfoList = ImmutableList.of(
+                new ColumnInfo("idCol", PInteger.INSTANCE.getSqlType()),
+                new ColumnInfo("unsignedIntCol", PUnsignedInt.INSTANCE.getSqlType()),
+                new ColumnInfo("stringArrayCol", PIntegerArray.INSTANCE.getSqlType()));
+
+        Configuration conf = new Configuration();
+        FormatToBytesWritableMapper.configureColumnInfoList(conf, columnInfoList);
+        List<ColumnInfo> fromConfig = FormatToBytesWritableMapper.buildColumnInfoList(conf);
+
+        assertEquals(columnInfoList, fromConfig);
+    }
+
+    @Test
+    public void testBuildColumnInfoList_ContainingNulls() {
+        // A null value in the column info list means "skip that column in the input"
+        List<ColumnInfo> columnInfoListWithNull = Lists.newArrayList(
+                new ColumnInfo("idCol", PInteger.INSTANCE.getSqlType()),
+                null,
+                new ColumnInfo("unsignedIntCol", PUnsignedInt.INSTANCE.getSqlType()),
+                new ColumnInfo("stringArrayCol", PIntegerArray.INSTANCE.getSqlType()));
+
+        Configuration conf = new Configuration();
+        FormatToBytesWritableMapper.configureColumnInfoList(conf, columnInfoListWithNull);
+        List<ColumnInfo> fromConfig = FormatToBytesWritableMapper.buildColumnInfoList(conf);
+
+        assertEquals(columnInfoListWithNull, fromConfig);
+    }
+
+    @Test
+    public void testLoadPreUpdateProcessor() {
+        Configuration conf = new Configuration();
+        conf.setClass(PhoenixConfigurationUtil.UPSERT_HOOK_CLASS_CONFKEY, MockUpsertProcessor.class,
+                ImportPreUpsertKeyValueProcessor.class);
+
+        ImportPreUpsertKeyValueProcessor processor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
+        assertEquals(MockUpsertProcessor.class, processor.getClass());
+    }
+
+    @Test
+    public void testLoadPreUpdateProcessor_NotConfigured() {
+
+        Configuration conf = new Configuration();
+        ImportPreUpsertKeyValueProcessor processor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
+
+        assertEquals(FormatToBytesWritableMapper.DefaultImportPreUpsertKeyValueProcessor.class,
+                processor.getClass());
+    }
+
+    @Test(expected=IllegalStateException.class)
+    public void testLoadPreUpdateProcessor_ClassNotFound() {
+        Configuration conf = new Configuration();
+        conf.set(PhoenixConfigurationUtil.UPSERT_HOOK_CLASS_CONFKEY, "MyUndefinedClass");
+
+        PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
+    }
+
+    static class MockUpsertProcessor implements ImportPreUpsertKeyValueProcessor {
+        @Override
+        public List<KeyValue> preUpsert(byte[] rowKey, List<KeyValue> keyValues) {
+            throw new UnsupportedOperationException("Not yet implemented");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/952a01cc/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapperTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapperTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapperTest.java
deleted file mode 100644
index 3455616..0000000
--- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapperTest.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.mapreduce;
-
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
-import org.apache.phoenix.schema.types.PInteger;
-import org.apache.phoenix.schema.types.PIntegerArray;
-import org.apache.phoenix.schema.types.PUnsignedInt;
-import org.apache.phoenix.util.ColumnInfo;
-import org.junit.Test;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-import static org.junit.Assert.assertEquals;
-
-public class FormatToKeyValueMapperTest {
-
-    @Test
-    public void testBuildColumnInfoList() {
-        List<ColumnInfo> columnInfoList = ImmutableList.of(
-                new ColumnInfo("idCol", PInteger.INSTANCE.getSqlType()),
-                new ColumnInfo("unsignedIntCol", PUnsignedInt.INSTANCE.getSqlType()),
-                new ColumnInfo("stringArrayCol", PIntegerArray.INSTANCE.getSqlType()));
-
-        Configuration conf = new Configuration();
-        FormatToKeyValueMapper.configureColumnInfoList(conf, columnInfoList);
-        List<ColumnInfo> fromConfig = FormatToKeyValueMapper.buildColumnInfoList(conf);
-
-        assertEquals(columnInfoList, fromConfig);
-    }
-
-    @Test
-    public void testBuildColumnInfoList_ContainingNulls() {
-        // A null value in the column info list means "skip that column in the input"
-        List<ColumnInfo> columnInfoListWithNull = Lists.newArrayList(
-                new ColumnInfo("idCol", PInteger.INSTANCE.getSqlType()),
-                null,
-                new ColumnInfo("unsignedIntCol", PUnsignedInt.INSTANCE.getSqlType()),
-                new ColumnInfo("stringArrayCol", PIntegerArray.INSTANCE.getSqlType()));
-
-        Configuration conf = new Configuration();
-        FormatToKeyValueMapper.configureColumnInfoList(conf, columnInfoListWithNull);
-        List<ColumnInfo> fromConfig = FormatToKeyValueMapper.buildColumnInfoList(conf);
-
-        assertEquals(columnInfoListWithNull, fromConfig);
-    }
-
-    @Test
-    public void testLoadPreUpdateProcessor() {
-        Configuration conf = new Configuration();
-        conf.setClass(PhoenixConfigurationUtil.UPSERT_HOOK_CLASS_CONFKEY, MockUpsertProcessor.class,
-                ImportPreUpsertKeyValueProcessor.class);
-
-        ImportPreUpsertKeyValueProcessor processor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
-        assertEquals(MockUpsertProcessor.class, processor.getClass());
-    }
-
-    @Test
-    public void testLoadPreUpdateProcessor_NotConfigured() {
-
-        Configuration conf = new Configuration();
-        ImportPreUpsertKeyValueProcessor processor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
-
-        assertEquals(FormatToKeyValueMapper.DefaultImportPreUpsertKeyValueProcessor.class,
-                processor.getClass());
-    }
-
-    @Test(expected=IllegalStateException.class)
-    public void testLoadPreUpdateProcessor_ClassNotFound() {
-        Configuration conf = new Configuration();
-        conf.set(PhoenixConfigurationUtil.UPSERT_HOOK_CLASS_CONFKEY, "MyUndefinedClass");
-
-        PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
-    }
-
-    static class MockUpsertProcessor implements ImportPreUpsertKeyValueProcessor {
-        @Override
-        public List<KeyValue> preUpsert(byte[] rowKey, List<KeyValue> keyValues) {
-            throw new UnsupportedOperationException("Not yet implemented");
-        }
-    }
-}