You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by nd...@apache.org on 2015/12/08 00:02:07 UTC

[3/3] phoenix git commit: PHOENIX-2481 JSON bulkload tool

PHOENIX-2481 JSON bulkload tool


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/578979a1
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/578979a1
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/578979a1

Branch: refs/heads/master
Commit: 578979a1437124d53e319ad554b72793bcef6fd3
Parents: 8ce3b58
Author: Nick Dimiduk <nd...@apache.org>
Authored: Mon Nov 16 17:18:34 2015 -0800
Committer: Nick Dimiduk <nd...@apache.org>
Committed: Mon Dec 7 13:09:09 2015 -0800

----------------------------------------------------------------------
 .../phoenix/mapreduce/CsvBulkLoadToolIT.java    |  16 +-
 .../phoenix/mapreduce/AbstractBulkLoadTool.java | 402 +++++++++++++++
 .../phoenix/mapreduce/CsvBulkImportUtil.java    |  20 +-
 .../phoenix/mapreduce/CsvBulkLoadTool.java      | 514 +------------------
 .../phoenix/mapreduce/CsvToKeyValueMapper.java  | 226 +-------
 .../phoenix/mapreduce/CsvToKeyValueReducer.java |  55 --
 .../mapreduce/FormatToKeyValueMapper.java       | 259 ++++++++++
 .../mapreduce/FormatToKeyValueReducer.java      |  54 ++
 .../ImportPreUpsertKeyValueProcessor.java       |   3 +-
 .../phoenix/mapreduce/JsonBulkLoadTool.java     |  53 ++
 .../phoenix/mapreduce/JsonToKeyValueMapper.java |  75 +++
 .../mapreduce/MultiHfileOutputFormat.java       |  38 +-
 .../mapreduce/bulkload/CsvTableRowkeyPair.java  | 139 -----
 .../mapreduce/bulkload/TableRowkeyPair.java     | 134 +++++
 .../mapreduce/bulkload/TargetTableRef.java      |  70 +++
 .../bulkload/TargetTableRefFunctions.java       |  95 ++++
 .../util/PhoenixConfigurationUtil.java          |  15 +-
 .../apache/phoenix/util/CSVCommonsLoader.java   | 160 +-----
 .../org/apache/phoenix/util/SchemaUtil.java     | 145 +++++-
 .../org/apache/phoenix/util/UpsertExecutor.java | 156 ++++++
 .../phoenix/util/csv/CsvUpsertExecutor.java     | 131 +----
 .../phoenix/util/json/JsonUpsertExecutor.java   | 209 ++++++++
 .../util/json/ObjectToArrayConverter.java       |  69 +++
 .../phoenix/mapreduce/BulkLoadToolTest.java     |  78 +++
 .../mapreduce/CsvBulkImportUtilTest.java        |  18 +-
 .../phoenix/mapreduce/CsvBulkLoadToolTest.java  |  69 ---
 .../mapreduce/CsvToKeyValueMapperTest.java      |  84 +--
 .../mapreduce/FormatToKeyValueMapperTest.java   | 102 ++++
 .../util/AbstractUpsertExecutorTest.java        | 136 +++++
 .../phoenix/util/csv/CsvUpsertExecutorTest.java | 144 ++----
 .../util/json/JsonUpsertExecutorTest.java       |  53 ++
 31 files changed, 2222 insertions(+), 1500 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
index 0e74d7b..a5b7488 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
@@ -17,14 +17,6 @@
  */
 package org.apache.phoenix.mapreduce;
 
-import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
-import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 import java.io.PrintWriter;
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -47,6 +39,14 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
+import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 @Category(NeedsOwnMiniClusterTest.class)
 public class CsvBulkLoadToolIT {
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
new file mode 100644
index 0000000..cf9ddef
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
+import org.apache.phoenix.mapreduce.bulkload.TargetTableRef;
+import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.StringUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Base tool for running MapReduce-based ingests of data.
+ */
+public abstract class AbstractBulkLoadTool extends Configured implements Tool {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(AbstractBulkLoadTool.class);
+
+    static final Option ZK_QUORUM_OPT = new Option("z", "zookeeper", true, "Supply zookeeper connection details (optional)");
+    static final Option INPUT_PATH_OPT = new Option("i", "input", true, "Input path (mandatory)");
+    static final Option OUTPUT_PATH_OPT = new Option("o", "output", true, "Output path for temporary HFiles (optional)");
+    static final Option SCHEMA_NAME_OPT = new Option("s", "schema", true, "Phoenix schema name (optional)");
+    static final Option TABLE_NAME_OPT = new Option("t", "table", true, "Phoenix table name (mandatory)");
+    static final Option INDEX_TABLE_NAME_OPT = new Option("it", "index-table", true, "Phoenix index table name when just loading this particualar index table");
+    static final Option IMPORT_COLUMNS_OPT = new Option("c", "import-columns", true, "Comma-separated list of columns to be imported");
+    static final Option IGNORE_ERRORS_OPT = new Option("g", "ignore-errors", false, "Ignore input errors");
+    static final Option HELP_OPT = new Option("h", "help", false, "Show this help and quit");
+
+    /**
+     * Set configuration values based on parsed command line options.
+     *
+     * @param cmdLine supplied command line options
+     * @param importColumns descriptors of columns to be imported
+     * @param conf job configuration
+     */
+    protected abstract void configureOptions(CommandLine cmdLine, List<ColumnInfo> importColumns,
+                                         Configuration conf) throws SQLException;
+    protected abstract void setupJob(Job job);
+
+    protected Options getOptions() {
+        Options options = new Options();
+        options.addOption(INPUT_PATH_OPT);
+        options.addOption(TABLE_NAME_OPT);
+        options.addOption(INDEX_TABLE_NAME_OPT);
+        options.addOption(ZK_QUORUM_OPT);
+        options.addOption(OUTPUT_PATH_OPT);
+        options.addOption(SCHEMA_NAME_OPT);
+        options.addOption(IMPORT_COLUMNS_OPT);
+        options.addOption(IGNORE_ERRORS_OPT);
+        options.addOption(HELP_OPT);
+        return options;
+    }
+
+    /**
+     * Parses the commandline arguments, throws IllegalStateException if mandatory arguments are
+     * missing.
+     *
+     * @param args supplied command line arguments
+     * @return the parsed command line
+     */
+    protected CommandLine parseOptions(String[] args) {
+
+        Options options = getOptions();
+
+        CommandLineParser parser = new PosixParser();
+        CommandLine cmdLine = null;
+        try {
+            cmdLine = parser.parse(options, args);
+        } catch (ParseException e) {
+            printHelpAndExit("Error parsing command line options: " + e.getMessage(), options);
+        }
+
+        if (cmdLine.hasOption(HELP_OPT.getOpt())) {
+            printHelpAndExit(options, 0);
+        }
+
+        if (!cmdLine.hasOption(TABLE_NAME_OPT.getOpt())) {
+            throw new IllegalStateException(TABLE_NAME_OPT.getLongOpt() + " is a mandatory " +
+                    "parameter");
+        }
+
+        if (!cmdLine.getArgList().isEmpty()) {
+            throw new IllegalStateException("Got unexpected extra parameters: "
+                    + cmdLine.getArgList());
+        }
+
+        if (!cmdLine.hasOption(INPUT_PATH_OPT.getOpt())) {
+            throw new IllegalStateException(INPUT_PATH_OPT.getLongOpt() + " is a mandatory " +
+                    "parameter");
+        }
+
+        return cmdLine;
+    }
+
+    private void printHelpAndExit(String errorMessage, Options options) {
+        System.err.println(errorMessage);
+        printHelpAndExit(options, 1);
+    }
+
+    private void printHelpAndExit(Options options, int exitCode) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp("help", options);
+        System.exit(exitCode);
+    }
+
+    @Override
+    public int run(String[] args) throws Exception {
+
+        Configuration conf = HBaseConfiguration.create(getConf());
+
+        CommandLine cmdLine = null;
+        try {
+            cmdLine = parseOptions(args);
+        } catch (IllegalStateException e) {
+            printHelpAndExit(e.getMessage(), getOptions());
+        }
+        return loadData(conf, cmdLine);
+    }
+
+    private int loadData(Configuration conf, CommandLine cmdLine) throws SQLException,
+            InterruptedException, ExecutionException, ClassNotFoundException {
+        String tableName = cmdLine.getOptionValue(TABLE_NAME_OPT.getOpt());
+        String schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPT.getOpt());
+        String indexTableName = cmdLine.getOptionValue(INDEX_TABLE_NAME_OPT.getOpt());
+        String qualifiedTableName = getQualifiedTableName(schemaName, tableName);
+        String qualifiedIndexTableName = null;
+        if (indexTableName != null){
+            qualifiedIndexTableName = getQualifiedTableName(schemaName, indexTableName);
+        }
+
+        if (cmdLine.hasOption(ZK_QUORUM_OPT.getOpt())) {
+            // ZK_QUORUM_OPT is optional, but if it's there, use it for both the conn and the job.
+            String zkQuorum = cmdLine.getOptionValue(ZK_QUORUM_OPT.getOpt());
+            PhoenixDriver.ConnectionInfo info = PhoenixDriver.ConnectionInfo.create(zkQuorum);
+            LOG.info("Configuring HBase connection to {}", info);
+            for (Map.Entry<String,String> entry : info.asProps()) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Setting {} = {}", entry.getKey(), entry.getValue());
+                }
+                conf.set(entry.getKey(), entry.getValue());
+            }
+        }
+
+        final Connection conn = QueryUtil.getConnection(conf);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Reading columns from {} :: {}", ((PhoenixConnection) conn).getURL(),
+                    qualifiedTableName);
+        }
+        List<ColumnInfo> importColumns = buildImportColumns(conn, cmdLine, qualifiedTableName);
+        Preconditions.checkNotNull(importColumns);
+        Preconditions.checkArgument(!importColumns.isEmpty(), "Column info list is empty");
+        FormatToKeyValueMapper.configureColumnInfoList(conf, importColumns);
+        boolean ignoreInvalidRows = cmdLine.hasOption(IGNORE_ERRORS_OPT.getOpt());
+        conf.setBoolean(FormatToKeyValueMapper.IGNORE_INVALID_ROW_CONFKEY, ignoreInvalidRows);
+        conf.set(FormatToKeyValueMapper.TABLE_NAME_CONFKEY, tableName);
+
+        // give subclasses their hook
+        configureOptions(cmdLine, importColumns, conf);
+        try {
+            validateTable(conn, schemaName, tableName);
+        } finally {
+            conn.close();
+        }
+
+        final Path inputPath = new Path(cmdLine.getOptionValue(INPUT_PATH_OPT.getOpt()));
+        final Path outputPath;
+        if (cmdLine.hasOption(OUTPUT_PATH_OPT.getOpt())) {
+            outputPath = new Path(cmdLine.getOptionValue(OUTPUT_PATH_OPT.getOpt()));
+        } else {
+            outputPath = new Path("/tmp/" + UUID.randomUUID());
+        }
+
+        List<TargetTableRef> tablesToBeLoaded = new ArrayList<TargetTableRef>();
+        tablesToBeLoaded.add(new TargetTableRef(qualifiedTableName));
+        // using conn after it's been closed... o.O
+        tablesToBeLoaded.addAll(getIndexTables(conn, schemaName, qualifiedTableName));
+
+        // When loading a single index table, check index table name is correct
+        if (qualifiedIndexTableName != null){
+            TargetTableRef targetIndexRef = null;
+            for (TargetTableRef tmpTable : tablesToBeLoaded){
+                if (tmpTable.getLogicalName().compareToIgnoreCase(qualifiedIndexTableName) == 0) {
+                    targetIndexRef = tmpTable;
+                    break;
+                }
+            }
+            if (targetIndexRef == null){
+                throw new IllegalStateException("Bulk Loader error: index table " +
+                        qualifiedIndexTableName + " doesn't exist");
+            }
+            tablesToBeLoaded.clear();
+            tablesToBeLoaded.add(targetIndexRef);
+        }
+
+        return submitJob(conf, tableName, inputPath, outputPath, tablesToBeLoaded);
+    }
+
+    /**
+     * Submits the jobs to the cluster.
+     * Loads the HFiles onto the respective tables.
+     */
+    public int submitJob(final Configuration conf, final String qualifiedTableName, final Path inputPath,
+                         final Path outputPath , List<TargetTableRef> tablesToBeLoaded) {
+        try {
+            Job job = new Job(conf, "Phoenix MapReduce import for " + qualifiedTableName);
+            FileInputFormat.addInputPath(job, inputPath);
+            FileOutputFormat.setOutputPath(job, outputPath);
+
+            job.setInputFormatClass(TextInputFormat.class);
+            job.setMapOutputKeyClass(TableRowkeyPair.class);
+            job.setMapOutputValueClass(KeyValue.class);
+            job.setOutputKeyClass(TableRowkeyPair.class);
+            job.setOutputValueClass(KeyValue.class);
+            job.setReducerClass(FormatToKeyValueReducer.class);
+
+            MultiHfileOutputFormat.configureIncrementalLoad(job, tablesToBeLoaded);
+
+            final String tableNamesAsJson = TargetTableRefFunctions.NAMES_TO_JSON.apply(tablesToBeLoaded);
+            job.getConfiguration().set(FormatToKeyValueMapper.TABLE_NAMES_CONFKEY,tableNamesAsJson);
+
+            // give subclasses their hook
+            setupJob(job);
+
+            LOG.info("Running MapReduce import job from {} to {}", inputPath, outputPath);
+            boolean success = job.waitForCompletion(true);
+
+            if (success) {
+                LOG.info("Loading HFiles from {}", outputPath);
+                completebulkload(conf,outputPath,tablesToBeLoaded);
+            }
+
+            LOG.info("Removing output directory {}", outputPath);
+            if (!FileSystem.get(conf).delete(outputPath, true)) {
+                LOG.error("Removing output directory {} failed", outputPath);
+            }
+            return 0;
+        } catch(Exception e) {
+            LOG.error("Error {} occurred submitting BulkLoad ",e.getMessage());
+            return -1;
+        }
+
+    }
+
+    private void completebulkload(Configuration conf,Path outputPath , List<TargetTableRef> tablesToBeLoaded) throws Exception {
+        for(TargetTableRef table : tablesToBeLoaded) {
+            LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
+            String tableName = table.getPhysicalName();
+            Path tableOutputPath = new Path(outputPath,tableName);
+            HTable htable = new HTable(conf,tableName);
+            LOG.info("Loading HFiles for {} from {}", tableName , tableOutputPath);
+            loader.doBulkLoad(tableOutputPath, htable);
+            LOG.info("Incremental load complete for table=" + tableName);
+        }
+    }
+
+    /**
+     * Build up the list of columns to be imported. The list is taken from the command line if
+     * present, otherwise it is taken from the table description.
+     *
+     * @param conn connection to Phoenix
+     * @param cmdLine supplied command line options
+     * @param qualifiedTableName table name (possibly with schema) of the table to be imported
+     * @return the list of columns to be imported
+     */
+    List<ColumnInfo> buildImportColumns(Connection conn, CommandLine cmdLine,
+                                        String qualifiedTableName) throws SQLException {
+        List<String> userSuppliedColumnNames = null;
+        if (cmdLine.hasOption(IMPORT_COLUMNS_OPT.getOpt())) {
+            userSuppliedColumnNames = Lists.newArrayList(
+                    Splitter.on(",").trimResults().split
+                            (cmdLine.getOptionValue(IMPORT_COLUMNS_OPT.getOpt())));
+        }
+        return SchemaUtil.generateColumnInfo(
+                conn, qualifiedTableName, userSuppliedColumnNames, true);
+    }
+
+    /**
+     * Calculate the HBase HTable name for which the import is to be done.
+     *
+     * @param schemaName import schema name, can be null
+     * @param tableName import table name
+     * @return the byte representation of the import HTable
+     */
+    @VisibleForTesting
+    static String getQualifiedTableName(String schemaName, String tableName) {
+        if (schemaName != null) {
+            return String.format("%s.%s", SchemaUtil.normalizeIdentifier(schemaName),
+                    SchemaUtil.normalizeIdentifier(tableName));
+        } else {
+            return SchemaUtil.normalizeIdentifier(tableName);
+        }
+    }
+
+    /**
+     * Perform any required validation on the table being bulk loaded into:
+     * - ensure no column family names start with '_', as they'd be ignored leading to problems.
+     * @throws java.sql.SQLException
+     */
+    private void validateTable(Connection conn, String schemaName,
+                               String tableName) throws SQLException {
+
+        ResultSet rs = conn.getMetaData().getColumns(
+                null, StringUtil.escapeLike(schemaName),
+                StringUtil.escapeLike(tableName), null);
+        while (rs.next()) {
+            String familyName = rs.getString(PhoenixDatabaseMetaData.COLUMN_FAMILY);
+            if (familyName != null && familyName.startsWith("_")) {
+                if (QueryConstants.DEFAULT_COLUMN_FAMILY.equals(familyName)) {
+                    throw new IllegalStateException(
+                            "Bulk Loader error: All column names that are not part of the " +
+                                    "primary key constraint must be prefixed with a column family " +
+                                    "name (i.e. f.my_column VARCHAR)");
+                } else {
+                    throw new IllegalStateException("Bulk Loader error: Column family name " +
+                            "must not start with '_': " + familyName);
+                }
+            }
+        }
+        rs.close();
+    }
+
+    /**
+     * Get the index tables of current data table
+     * @throws java.sql.SQLException
+     */
+    private List<TargetTableRef> getIndexTables(Connection conn, String schemaName, String qualifiedTableName)
+            throws SQLException {
+        PTable table = PhoenixRuntime.getTable(conn, qualifiedTableName);
+        List<TargetTableRef> indexTables = new ArrayList<TargetTableRef>();
+        for(PTable indexTable : table.getIndexes()){
+            if (indexTable.getIndexType() == PTable.IndexType.LOCAL) {
+                throw new UnsupportedOperationException("Local indexes not supported by Bulk Loader");
+                /*indexTables.add(
+                        new TargetTableRef(getQualifiedTableName(schemaName,
+                                indexTable.getTableName().getString()),
+                                MetaDataUtil.getLocalIndexTableName(qualifiedTableName))); */
+            } else {
+                indexTables.add(new TargetTableRef(getQualifiedTableName(schemaName,
+                        indexTable.getTableName().getString())));
+            }
+        }
+        return indexTables;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
index 6d77cd5..bdc67f9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
@@ -17,15 +17,11 @@
  */
 package org.apache.phoenix.mapreduce;
 
-import java.util.List;
-
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.util.Base64;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
-import org.apache.phoenix.util.ColumnInfo;
 
-import com.google.common.base.Preconditions;
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * Collection of utility methods for setting up bulk import jobs.
@@ -36,29 +32,19 @@ public class CsvBulkImportUtil {
      * Configure a job configuration for a bulk CSV import.
      *
      * @param conf job configuration to be set up
-     * @param tableName name of the table to be imported to, can include a schema name
      * @param fieldDelimiter field delimiter character for the CSV input
      * @param quoteChar quote character for the CSV input
      * @param escapeChar escape character for the CSV input
      * @param arrayDelimiter array delimiter character, can be null
-     * @param columnInfoList list of columns to be imported
-     * @param ignoreInvalidRows flag to ignore invalid input rows
      */
-    public static void initCsvImportJob(Configuration conf, String tableName, char fieldDelimiter, char quoteChar, char escapeChar,
-            String arrayDelimiter,  List<ColumnInfo> columnInfoList, boolean ignoreInvalidRows) {
-
-        Preconditions.checkNotNull(tableName);
-        Preconditions.checkNotNull(columnInfoList);
-        Preconditions.checkArgument(!columnInfoList.isEmpty(), "Column info list is empty");
-        conf.set(CsvToKeyValueMapper.TABLE_NAME_CONFKEY, tableName);
+    public static void initCsvImportJob(Configuration conf, char fieldDelimiter, char quoteChar,
+            char escapeChar, String arrayDelimiter) {
         setChar(conf, CsvToKeyValueMapper.FIELD_DELIMITER_CONFKEY, fieldDelimiter);
         setChar(conf, CsvToKeyValueMapper.QUOTE_CHAR_CONFKEY, quoteChar);
         setChar(conf, CsvToKeyValueMapper.ESCAPE_CHAR_CONFKEY, escapeChar);
         if (arrayDelimiter != null) {
             conf.set(CsvToKeyValueMapper.ARRAY_DELIMITER_CONFKEY, arrayDelimiter);
         }
-        CsvToKeyValueMapper.configureColumnInfoList(conf, columnInfoList);
-        conf.setBoolean(CsvToKeyValueMapper.IGNORE_INVALID_ROW_CONFKEY, ignoreInvalidRows);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
index 20f05ff..e0b083e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
@@ -17,364 +17,37 @@
  */
 package org.apache.phoenix.mapreduce;
 
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ExecutionException;
 
 import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.cli.PosixParser;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.jdbc.PhoenixDriver;
-import org.apache.phoenix.mapreduce.bulkload.CsvTableRowkeyPair;
-import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.PTable.IndexType;
-import org.apache.phoenix.util.CSVCommonsLoader;
 import org.apache.phoenix.util.ColumnInfo;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.QueryUtil;
-import org.apache.phoenix.util.SchemaUtil;
-import org.apache.phoenix.util.StringUtil;
-import org.codehaus.jackson.annotate.JsonCreator;
-import org.codehaus.jackson.annotate.JsonProperty;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+public class CsvBulkLoadTool extends AbstractBulkLoadTool {
 
-/**
- * Base tool for running MapReduce-based ingests of data.
- */
-@SuppressWarnings("deprecation")
-public class CsvBulkLoadTool extends Configured implements Tool {
-
-    private static final Logger LOG = LoggerFactory.getLogger(CsvBulkLoadTool.class);
-
-    static final Option ZK_QUORUM_OPT = new Option("z", "zookeeper", true, "Supply zookeeper connection details (optional)");
-    static final Option INPUT_PATH_OPT = new Option("i", "input", true, "Input CSV path (mandatory)");
-    static final Option OUTPUT_PATH_OPT = new Option("o", "output", true, "Output path for temporary HFiles (optional)");
-    static final Option SCHEMA_NAME_OPT = new Option("s", "schema", true, "Phoenix schema name (optional)");
-    static final Option TABLE_NAME_OPT = new Option("t", "table", true, "Phoenix table name (mandatory)");
-    static final Option INDEX_TABLE_NAME_OPT = new Option("it", "index-table", true, "Phoenix index table name when just loading this particualar index table");
     static final Option DELIMITER_OPT = new Option("d", "delimiter", true, "Input delimiter, defaults to comma");
     static final Option QUOTE_OPT = new Option("q", "quote", true, "Supply a custom phrase delimiter, defaults to double quote character");
     static final Option ESCAPE_OPT = new Option("e", "escape", true, "Supply a custom escape character, default is a backslash");
     static final Option ARRAY_DELIMITER_OPT = new Option("a", "array-delimiter", true, "Array element delimiter (optional)");
-    static final Option IMPORT_COLUMNS_OPT = new Option("c", "import-columns", true, "Comma-separated list of columns to be imported");
-    static final Option IGNORE_ERRORS_OPT = new Option("g", "ignore-errors", false, "Ignore input errors");
-    static final Option HELP_OPT = new Option("h", "help", false, "Show this help and quit");
-
-    public static void main(String[] args) throws Exception {
-        int exitStatus = ToolRunner.run(new CsvBulkLoadTool(), args);
-        System.exit(exitStatus);
-    }
-
-    /**
-     * Parses the commandline arguments, throws IllegalStateException if mandatory arguments are
-     * missing.
-     *
-     * @param args supplied command line arguments
-     * @return the parsed command line
-     */
-    CommandLine parseOptions(String[] args) {
-
-        Options options = getOptions();
-
-        CommandLineParser parser = new PosixParser();
-        CommandLine cmdLine = null;
-        try {
-            cmdLine = parser.parse(options, args);
-        } catch (ParseException e) {
-            printHelpAndExit("Error parsing command line options: " + e.getMessage(), options);
-        }
-
-        if (cmdLine.hasOption(HELP_OPT.getOpt())) {
-            printHelpAndExit(options, 0);
-        }
-
-        if (!cmdLine.hasOption(TABLE_NAME_OPT.getOpt())) {
-            throw new IllegalStateException(TABLE_NAME_OPT.getLongOpt() + " is a mandatory " +
-                    "parameter");
-        }
-
-        if (!cmdLine.getArgList().isEmpty()) {
-            throw new IllegalStateException("Got unexpected extra parameters: "
-                    + cmdLine.getArgList());
-        }
-
-        if (!cmdLine.hasOption(INPUT_PATH_OPT.getOpt())) {
-            throw new IllegalStateException(INPUT_PATH_OPT.getLongOpt() + " is a mandatory " +
-                    "parameter");
-        }
 
-        return cmdLine;
-    }
-
-    private Options getOptions() {
-        Options options = new Options();
-        options.addOption(INPUT_PATH_OPT);
-        options.addOption(TABLE_NAME_OPT);
-        options.addOption(INDEX_TABLE_NAME_OPT);
-        options.addOption(ZK_QUORUM_OPT);
-        options.addOption(OUTPUT_PATH_OPT);
-        options.addOption(SCHEMA_NAME_OPT);
+    @Override
+    protected Options getOptions() {
+        Options options = super.getOptions();
         options.addOption(DELIMITER_OPT);
         options.addOption(QUOTE_OPT);
         options.addOption(ESCAPE_OPT);
         options.addOption(ARRAY_DELIMITER_OPT);
-        options.addOption(IMPORT_COLUMNS_OPT);
-        options.addOption(IGNORE_ERRORS_OPT);
-        options.addOption(HELP_OPT);
         return options;
     }
 
-
-    private void printHelpAndExit(String errorMessage, Options options) {
-        System.err.println(errorMessage);
-        printHelpAndExit(options, 1);
-    }
-
-    private void printHelpAndExit(Options options, int exitCode) {
-        HelpFormatter formatter = new HelpFormatter();
-        formatter.printHelp("help", options);
-        System.exit(exitCode);
-    }
-
     @Override
-    public int run(String[] args) throws Exception {
-
-        Configuration conf = HBaseConfiguration.create(getConf());
-
-        CommandLine cmdLine = null;
-        try {
-            cmdLine = parseOptions(args);
-        } catch (IllegalStateException e) {
-            printHelpAndExit(e.getMessage(), getOptions());
-        }
-        return loadData(conf, cmdLine);
-    }
-
-	private int loadData(Configuration conf, CommandLine cmdLine) throws SQLException,
-            InterruptedException, ExecutionException, ClassNotFoundException {
-        String tableName = cmdLine.getOptionValue(TABLE_NAME_OPT.getOpt());
-        String schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPT.getOpt());
-        String indexTableName = cmdLine.getOptionValue(INDEX_TABLE_NAME_OPT.getOpt());
-        String qualifiedTableName = getQualifiedTableName(schemaName, tableName);
-        String qualifiedIndexTableName = null;
-        if (indexTableName != null){
-        	qualifiedIndexTableName = getQualifiedTableName(schemaName, indexTableName);
-        }
-
-        if (cmdLine.hasOption(ZK_QUORUM_OPT.getOpt())) {
-            // ZK_QUORUM_OPT is optional, but if it's there, use it for both the conn and the job.
-            String zkQuorum = cmdLine.getOptionValue(ZK_QUORUM_OPT.getOpt());
-            PhoenixDriver.ConnectionInfo info = PhoenixDriver.ConnectionInfo.create(zkQuorum);
-            LOG.info("Configuring HBase connection to {}", info);
-            for (Map.Entry<String,String> entry : info.asProps()) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Setting {} = {}", entry.getKey(), entry.getValue());
-                }
-                conf.set(entry.getKey(), entry.getValue());
-            }
-        }
-
-        final Connection conn = QueryUtil.getConnection(conf);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Reading columns from {} :: {}", ((PhoenixConnection) conn).getURL(),
-                    qualifiedTableName);
-        }
-        List<ColumnInfo> importColumns = buildImportColumns(conn, cmdLine, qualifiedTableName);
-        configureOptions(cmdLine, importColumns, conf);
-        try {
-            validateTable(conn, schemaName, tableName);
-        } finally {
-            conn.close();
-        }
-
-        final Path inputPath = new Path(cmdLine.getOptionValue(INPUT_PATH_OPT.getOpt()));
-        final Path outputPath;
-        if (cmdLine.hasOption(OUTPUT_PATH_OPT.getOpt())) {
-            outputPath = new Path(cmdLine.getOptionValue(OUTPUT_PATH_OPT.getOpt()));
-        } else {
-            outputPath = new Path("/tmp/" + UUID.randomUUID());
-        }
-        
-        List<TargetTableRef> tablesToBeLoaded = new ArrayList<TargetTableRef>();
-        tablesToBeLoaded.add(new TargetTableRef(qualifiedTableName));
-        // using conn after it's been closed... o.O
-        tablesToBeLoaded.addAll(getIndexTables(conn, schemaName, qualifiedTableName));
-        
-        // When loading a single index table, check index table name is correct
-        if (qualifiedIndexTableName != null){
-            TargetTableRef targetIndexRef = null;
-        	for (TargetTableRef tmpTable : tablesToBeLoaded){
-        		if (tmpTable.getLogicalName().compareToIgnoreCase(qualifiedIndexTableName) == 0) {
-                    targetIndexRef = tmpTable;
-        			break;
-        		}
-        	}
-        	if (targetIndexRef == null){
-                throw new IllegalStateException("CSV Bulk Loader error: index table " +
-                    qualifiedIndexTableName + " doesn't exist");
-        	}
-        	tablesToBeLoaded.clear();
-        	tablesToBeLoaded.add(targetIndexRef);
-        }
-        
-        return submitJob(conf, tableName, inputPath, outputPath, tablesToBeLoaded);
-	}
-	
-	/**
-	 * Submits the jobs to the cluster. 
-	 * Loads the HFiles onto the respective tables.
-	 * @param configuration
-	 * @param qualifiedTableName
-	 * @param inputPath
-	 * @param outputPath
-	 * @param tablesToBeoaded
-	 * @return status 
-	 */
-	public int submitJob(final Configuration conf, final String qualifiedTableName, final Path inputPath,
-	                        final Path outputPath , List<TargetTableRef> tablesToBeLoaded) {
-	    try {
-	        Job job = new Job(conf, "Phoenix MapReduce import for " + qualifiedTableName);
-    
-            // Allow overriding the job jar setting by using a -D system property at startup
-            if (job.getJar() == null) {
-                job.setJarByClass(CsvToKeyValueMapper.class);
-            }
-            job.setInputFormatClass(TextInputFormat.class);
-            FileInputFormat.addInputPath(job, inputPath);
-            FileOutputFormat.setOutputPath(job, outputPath);
-            job.setMapperClass(CsvToKeyValueMapper.class);
-            job.setMapOutputKeyClass(CsvTableRowkeyPair.class);
-            job.setMapOutputValueClass(KeyValue.class);
-            job.setOutputKeyClass(CsvTableRowkeyPair.class);
-            job.setOutputValueClass(KeyValue.class);
-            job.setReducerClass(CsvToKeyValueReducer.class);
-          
-            MultiHfileOutputFormat.configureIncrementalLoad(job, tablesToBeLoaded);
-    
-            final String tableNamesAsJson = TargetTableRefFunctions.NAMES_TO_JSON.apply(tablesToBeLoaded);
-            job.getConfiguration().set(CsvToKeyValueMapper.TABLE_NAMES_CONFKEY,tableNamesAsJson);
-            
-            LOG.info("Running MapReduce import job from {} to {}", inputPath, outputPath);
-            boolean success = job.waitForCompletion(true);
-            
-            if (success) {
-               LOG.info("Loading HFiles from {}", outputPath);
-               completebulkload(conf,outputPath,tablesToBeLoaded);
-            }
-        
-           LOG.info("Removing output directory {}", outputPath);
-           if (!FileSystem.get(conf).delete(outputPath, true)) {
-               LOG.error("Removing output directory {} failed", outputPath);
-           }
-           return 0;
-        } catch(Exception e) {
-            LOG.error("Error {} occurred submitting CSVBulkLoad ",e.getMessage());
-            return -1;
-        }
-	    
-	}
-	
-	/**
-	 * bulkload HFiles .
-	 * @param conf
-	 * @param outputPath
-	 * @param tablesToBeLoaded
-	 * @throws Exception
-	 */
-	private void completebulkload(Configuration conf,Path outputPath , List<TargetTableRef> tablesToBeLoaded) throws Exception {
-	    for(TargetTableRef table : tablesToBeLoaded) {
-	        LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
-            String tableName = table.getPhysicalName();
-            Path tableOutputPath = new Path(outputPath,tableName);
-            HTable htable = new HTable(conf,tableName);
-            LOG.info("Loading HFiles for {} from {}", tableName , tableOutputPath);
-            loader.doBulkLoad(tableOutputPath, htable);
-            LOG.info("Incremental load complete for table=" + tableName);
-        }
-	}
-
-    /**
-     * Build up the list of columns to be imported. The list is taken from the command line if
-     * present, otherwise it is taken from the table description.
-     *
-     * @param conn connection to Phoenix
-     * @param cmdLine supplied command line options
-     * @param qualifiedTableName table name (possibly with schema) of the table to be imported
-     * @return the list of columns to be imported
-     */
-    List<ColumnInfo> buildImportColumns(Connection conn, CommandLine cmdLine,
-            String qualifiedTableName) throws SQLException {
-        List<String> userSuppliedColumnNames = null;
-        if (cmdLine.hasOption(IMPORT_COLUMNS_OPT.getOpt())) {
-            userSuppliedColumnNames = Lists.newArrayList(
-                    Splitter.on(",").trimResults().split
-                            (cmdLine.getOptionValue(IMPORT_COLUMNS_OPT.getOpt())));
-        }
-        return CSVCommonsLoader.generateColumnInfo(
-                conn, qualifiedTableName, userSuppliedColumnNames, true);
-    }
-
-    /**
-     * Calculate the HBase HTable name for which the import is to be done.
-     *
-     * @param schemaName import schema name, can be null
-     * @param tableName import table name
-     * @return the byte representation of the import HTable
-     */
-    @VisibleForTesting
-    static String getQualifiedTableName(String schemaName, String tableName) {
-        if (schemaName != null) {
-            return String.format("%s.%s", SchemaUtil.normalizeIdentifier(schemaName),
-                    SchemaUtil.normalizeIdentifier(tableName));
-        } else {
-            return SchemaUtil.normalizeIdentifier(tableName);
-        }
-    }
-
-    /**
-     * Set configuration values based on parsed command line options.
-     *
-     * @param cmdLine supplied command line options
-     * @param importColumns descriptors of columns to be imported
-     * @param conf job configuration
-     */
-    private static void configureOptions(CommandLine cmdLine, List<ColumnInfo> importColumns,
-            Configuration conf) throws SQLException {
+    protected void configureOptions(CommandLine cmdLine, List<ColumnInfo> importColumns,
+                                         Configuration conf) throws SQLException {
 
         // we don't parse ZK_QUORUM_OPT here because we need it in order to
         // create the connection we need to build importColumns.
@@ -408,178 +81,23 @@ public class CsvBulkLoadTool extends Configured implements Tool {
 
         CsvBulkImportUtil.initCsvImportJob(
                 conf,
-                getQualifiedTableName(
-                        cmdLine.getOptionValue(SCHEMA_NAME_OPT.getOpt()),
-                        cmdLine.getOptionValue(TABLE_NAME_OPT.getOpt())),
                 delimiterChar,
                 quoteChar,
                 escapeChar,
-                cmdLine.getOptionValue(ARRAY_DELIMITER_OPT.getOpt()),
-                importColumns,
-                cmdLine.hasOption(IGNORE_ERRORS_OPT.getOpt()));
+                cmdLine.getOptionValue(ARRAY_DELIMITER_OPT.getOpt()));
     }
 
-    /**
-     * Perform any required validation on the table being bulk loaded into:
-     * - ensure no column family names start with '_', as they'd be ignored leading to problems.
-     * @throws java.sql.SQLException
-     */
-    private void validateTable(Connection conn, String schemaName,
-            String tableName) throws SQLException {
-
-        ResultSet rs = conn.getMetaData().getColumns(
-                null, StringUtil.escapeLike(schemaName),
-                StringUtil.escapeLike(tableName), null);
-        while (rs.next()) {
-            String familyName = rs.getString(PhoenixDatabaseMetaData.COLUMN_FAMILY);
-            if (familyName != null && familyName.startsWith("_")) {
-                if (QueryConstants.DEFAULT_COLUMN_FAMILY.equals(familyName)) {
-                    throw new IllegalStateException(
-                            "CSV Bulk Loader error: All column names that are not part of the " +
-                                    "primary key constraint must be prefixed with a column family " +
-                                    "name (i.e. f.my_column VARCHAR)");
-                } else {
-                    throw new IllegalStateException("CSV Bulk Loader error: Column family name " +
-                            "must not start with '_': " + familyName);
-                }
-            }
-        }
-        rs.close();
-    }
-    
-    /**
-     * Get the index tables of current data table
-     * @throws java.sql.SQLException
-     */
-    private List<TargetTableRef> getIndexTables(Connection conn, String schemaName, String qualifiedTableName)
-        throws SQLException {
-        PTable table = PhoenixRuntime.getTable(conn, qualifiedTableName);
-        List<TargetTableRef> indexTables = new ArrayList<TargetTableRef>();
-        for(PTable indexTable : table.getIndexes()){
-            if (indexTable.getIndexType() == IndexType.LOCAL) {
-                throw new UnsupportedOperationException("Local indexes not supported by CSV Bulk Loader");
-                /*indexTables.add(
-                        new TargetTableRef(getQualifiedTableName(schemaName,
-                                indexTable.getTableName().getString()),
-                                MetaDataUtil.getLocalIndexTableName(qualifiedTableName))); */
-            } else {
-                indexTables.add(new TargetTableRef(getQualifiedTableName(schemaName,
-                        indexTable.getTableName().getString())));
-            }
+    @Override
+    protected void setupJob(Job job) {
+        // Allow overriding the job jar setting by using a -D system property at startup
+        if (job.getJar() == null) {
+            job.setJarByClass(CsvToKeyValueMapper.class);
         }
-        return indexTables;
+        job.setMapperClass(CsvToKeyValueMapper.class);
     }
 
-    /**
-     * Represents the logical and physical name of a single table to which data is to be loaded.
-     *
-     * This class exists to allow for the difference between HBase physical table names and
-     * Phoenix logical table names.
-     */
-     static class TargetTableRef {
-
-        @JsonProperty 
-        private final String logicalName;
-        
-        @JsonProperty
-        private final String physicalName;
-        
-        @JsonProperty
-        private Map<String,String> configuration = Maps.newHashMap();
-
-        private TargetTableRef(String name) {
-            this(name, name);
-        }
-
-        @JsonCreator
-        private TargetTableRef(@JsonProperty("logicalName") String logicalName, @JsonProperty("physicalName") String physicalName) {
-            this.logicalName = logicalName;
-            this.physicalName = physicalName;
-        }
-
-        public String getLogicalName() {
-            return logicalName;
-        }
-
-        public String getPhysicalName() {
-            return physicalName;
-        }
-        
-        public Map<String, String> getConfiguration() {
-            return configuration;
-        }
-
-        public void setConfiguration(Map<String, String> configuration) {
-            this.configuration = configuration;
-        }
+    public static void main(String[] args) throws Exception {
+        int exitStatus = ToolRunner.run(new CsvBulkLoadTool(), args);
+        System.exit(exitStatus);
     }
-
-     /**
-      * Utility functions to get/put json.
-      * 
-      */
-     static class TargetTableRefFunctions {
-         
-         public static Function<TargetTableRef,String> TO_JSON =  new Function<TargetTableRef,String>() {
-
-             @Override
-             public String apply(TargetTableRef input) {
-                 try {
-                     ObjectMapper mapper = new ObjectMapper();
-                     return mapper.writeValueAsString(input);
-                 } catch (IOException e) {
-                     throw new RuntimeException(e);
-                 }
-                 
-             }
-         };
-         
-         public static Function<String,TargetTableRef> FROM_JSON =  new Function<String,TargetTableRef>() {
-
-             @Override
-             public TargetTableRef apply(String json) {
-                 try {
-                     ObjectMapper mapper = new ObjectMapper();
-                     return mapper.readValue(json, TargetTableRef.class);
-                 } catch (IOException e) {
-                     throw new RuntimeException(e);
-                 }
-                 
-             }
-         };
-         
-         public static Function<List<TargetTableRef>,String> NAMES_TO_JSON =  new Function<List<TargetTableRef>,String>() {
-
-             @Override
-             public String apply(List<TargetTableRef> input) {
-                 try {
-                     List<String> tableNames = Lists.newArrayListWithCapacity(input.size());
-                     for(TargetTableRef table : input) {
-                         tableNames.add(table.getPhysicalName());
-                     }
-                     ObjectMapper mapper = new ObjectMapper();
-                     return mapper.writeValueAsString(tableNames);
-                 } catch (IOException e) {
-                     throw new RuntimeException(e);
-                 }
-                 
-             }
-         };
-         
-         public static Function<String,List<String>> NAMES_FROM_JSON =  new Function<String,List<String>>() {
-
-             @SuppressWarnings("unchecked")
-             @Override
-             public List<String> apply(String json) {
-                 try {
-                     ObjectMapper mapper = new ObjectMapper();
-                     return mapper.readValue(json, ArrayList.class);
-                 } catch (IOException e) {
-                     throw new RuntimeException(e);
-                 }
-                 
-             }
-         };
-     }
-    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
index c3b5a7d..5a5d378 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
@@ -19,46 +19,20 @@ package org.apache.phoenix.mapreduce;
 
 import java.io.IOException;
 import java.io.StringReader;
-import java.sql.SQLException;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map.Entry;
-import java.util.Properties;
-
-import javax.annotation.Nullable;
 
 import org.apache.commons.csv.CSVFormat;
 import org.apache.commons.csv.CSVParser;
 import org.apache.commons.csv.CSVRecord;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.mapreduce.CsvBulkLoadTool.TargetTableRefFunctions;
-import org.apache.phoenix.mapreduce.bulkload.CsvTableRowkeyPair;
-import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.util.CSVCommonsLoader;
 import org.apache.phoenix.util.ColumnInfo;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.UpsertExecutor;
 import org.apache.phoenix.util.csv.CsvUpsertExecutor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
 
 /**
  * MapReduce mapper that converts CSV input lines into KeyValues that can be written to HFiles.
@@ -67,12 +41,7 @@ import com.google.common.collect.Lists;
  * extracting the created KeyValues and rolling back the statement execution before it is
  * committed to HBase.
  */
-public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,CsvTableRowkeyPair,
-        KeyValue> {
-
-    private static final Logger LOG = LoggerFactory.getLogger(CsvToKeyValueMapper.class);
-
-    private static final String COUNTER_GROUP_NAME = "Phoenix MapReduce Import";
+public class CsvToKeyValueMapper extends FormatToKeyValueMapper<CSVRecord> {
 
     /** Configuration key for the field delimiter for input csv records */
     public static final String FIELD_DELIMITER_CONFKEY = "phoenix.mapreduce.import.fielddelimiter";
@@ -86,117 +55,25 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,CsvTableRowkey
     /** Configuration key for the array element delimiter for input arrays */
     public static final String ARRAY_DELIMITER_CONFKEY = "phoenix.mapreduce.import.arraydelimiter";
 
-    /** Configuration key for the name of the output table */
-    public static final String TABLE_NAME_CONFKEY = "phoenix.mapreduce.import.tablename";
-    
-    /** Configuration key for the name of the output index table */
-    public static final String INDEX_TABLE_NAME_CONFKEY = "phoenix.mapreduce.import.indextablename";
-
-    /** Configuration key for the columns to be imported */
-    public static final String COLUMN_INFO_CONFKEY = "phoenix.mapreduce.import.columninfos";
+    private CsvLineParser lineParser;
 
-    /** Configuration key for the flag to ignore invalid rows */
-    public static final String IGNORE_INVALID_ROW_CONFKEY = "phoenix.mapreduce.import.ignoreinvalidrow";
-    
-    /** Configuration key for the table names */
-    public static final String TABLE_NAMES_CONFKEY = "phoenix.mapreduce.import.tablenames";
-    
-    /** Configuration key for the table configurations */
-    public static final String TABLE_CONFIG_CONFKEY = "phoenix.mapreduce.import.table.config";
-
-    private PhoenixConnection conn;
-    private CsvUpsertExecutor csvUpsertExecutor;
-    private MapperUpsertListener upsertListener;
-    private CsvLineParser csvLineParser;
-    private ImportPreUpsertKeyValueProcessor preUpdateProcessor;
-    private List<String> tableNames;
+    @Override
+    protected LineParser<CSVRecord> getLineParser() {
+        return lineParser;
+    }
 
     @Override
     protected void setup(Context context) throws IOException, InterruptedException {
-
         Configuration conf = context.getConfiguration();
-
-        // pass client configuration into driver
-        Properties clientInfos = new Properties();
-        Iterator<Entry<String, String>> iterator = conf.iterator();
-        while(iterator.hasNext()) {
-            Entry<String,String> entry = iterator.next();
-            clientInfos.setProperty(entry.getKey(), entry.getValue());
-        }
-        
-        try {
-            conn = (PhoenixConnection) QueryUtil.getConnection(clientInfos, conf);
-        } catch (SQLException | ClassNotFoundException e) {
-            throw new RuntimeException(e);
-        }
-
-        final String tableNamesConf = conf.get(TABLE_NAMES_CONFKEY);
-        tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
-        
-        upsertListener = new MapperUpsertListener(
-                context, conf.getBoolean(IGNORE_INVALID_ROW_CONFKEY, true));
-        csvUpsertExecutor = buildUpsertExecutor(conf);
-        csvLineParser = new CsvLineParser(
+        lineParser = new CsvLineParser(
                 CsvBulkImportUtil.getCharacter(conf, FIELD_DELIMITER_CONFKEY),
                 CsvBulkImportUtil.getCharacter(conf, QUOTE_CHAR_CONFKEY),
                 CsvBulkImportUtil.getCharacter(conf, ESCAPE_CHAR_CONFKEY));
-
-        preUpdateProcessor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
-    }
-
-    @SuppressWarnings("deprecation")
-    @Override
-    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
-        try {
-            CSVRecord csvRecord = null;
-            try {
-                csvRecord = csvLineParser.parse(value.toString());
-            } catch (IOException e) {
-                context.getCounter(COUNTER_GROUP_NAME, "CSV Parser errors").increment(1L);
-            }
-
-            if (csvRecord == null) {
-                context.getCounter(COUNTER_GROUP_NAME, "Empty records").increment(1L);
-                return;
-            }
-            csvUpsertExecutor.execute(ImmutableList.of(csvRecord));
-
-            Iterator<Pair<byte[], List<KeyValue>>> uncommittedDataIterator
-                    = PhoenixRuntime.getUncommittedDataIterator(conn, true);
-            while (uncommittedDataIterator.hasNext()) {
-                Pair<byte[], List<KeyValue>> kvPair = uncommittedDataIterator.next();
-                List<KeyValue> keyValueList = kvPair.getSecond();
-                keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), keyValueList);
-                byte[] first = kvPair.getFirst();
-                for(String tableName : tableNames) {
-                    if (Bytes.compareTo(Bytes.toBytes(tableName), first) != 0) {
-                        // skip edits for other tables
-                        continue;
-                    }  
-                    for (KeyValue kv : keyValueList) {
-                        ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
-                        outputKey.set(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength());
-                        context.write(new CsvTableRowkeyPair(tableName, outputKey), kv);
-                    }
-                }
-            }
-            conn.rollback();
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    protected void cleanup(Context context) throws IOException, InterruptedException {
-        try {
-            conn.close();
-        } catch (SQLException e) {
-            throw new RuntimeException(e);
-        }
     }
 
     @VisibleForTesting
-    CsvUpsertExecutor buildUpsertExecutor(Configuration conf) {
+    @Override
+    protected UpsertExecutor<CSVRecord, ?> buildUpsertExecutor(Configuration conf) {
         String tableName = conf.get(TABLE_NAME_CONFKEY);
         String arraySeparator = conf.get(ARRAY_DELIMITER_CONFKEY,
                                             CSVCommonsLoader.DEFAULT_ARRAY_ELEMENT_SEPARATOR);
@@ -204,78 +81,14 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,CsvTableRowkey
 
         List<ColumnInfo> columnInfoList = buildColumnInfoList(conf);
 
-        return CsvUpsertExecutor.create(conn, tableName, columnInfoList, upsertListener, arraySeparator);
-    }
-
-    /**
-     * Write the list of to-import columns to a job configuration.
-     *
-     * @param conf configuration to be written to
-     * @param columnInfoList list of ColumnInfo objects to be configured for import
-     */
-    @VisibleForTesting
-    static void configureColumnInfoList(Configuration conf, List<ColumnInfo> columnInfoList) {
-        conf.set(COLUMN_INFO_CONFKEY, Joiner.on("|").useForNull("").join(columnInfoList));
-    }
-
-    /**
-     * Build the list of ColumnInfos for the import based on information in the configuration.
-     */
-    @VisibleForTesting
-    static List<ColumnInfo> buildColumnInfoList(Configuration conf) {
-
-        return Lists.newArrayList(
-                Iterables.transform(
-                        Splitter.on("|").split(conf.get(COLUMN_INFO_CONFKEY)),
-                        new Function<String, ColumnInfo>() {
-                            @Nullable
-                            @Override
-                            public ColumnInfo apply(@Nullable String input) {
-                                if (input.isEmpty()) {
-                                    // An empty string represents a null that was passed in to
-                                    // the configuration, which corresponds to an input column
-                                    // which is to be skipped
-                                    return null;
-                                }
-                                return ColumnInfo.fromString(input);
-                            }
-                        }));
-    }
-
-    /**
-     * Listener that logs successful upserts and errors to job counters.
-     */
-    @VisibleForTesting
-    static class MapperUpsertListener implements CsvUpsertExecutor.UpsertListener {
-
-        private final Context context;
-        private final boolean ignoreRecordErrors;
-
-        private MapperUpsertListener(Context context, boolean ignoreRecordErrors) {
-            this.context = context;
-            this.ignoreRecordErrors = ignoreRecordErrors;
-        }
-
-        @Override
-        public void upsertDone(long upsertCount) {
-            context.getCounter(COUNTER_GROUP_NAME, "Upserts Done").increment(1L);
-        }
-
-        @Override
-        public void errorOnRecord(CSVRecord csvRecord, Throwable throwable) {
-            LOG.error("Error on record " + csvRecord, throwable);
-            context.getCounter(COUNTER_GROUP_NAME, "Errors on records").increment(1L);
-            if (!ignoreRecordErrors) {
-                throw Throwables.propagate(throwable);
-            }
-        }
+        return new CsvUpsertExecutor(conn, tableName, columnInfoList, upsertListener, arraySeparator);
     }
 
     /**
      * Parses a single CSV input line, returning a {@code CSVRecord}.
      */
     @VisibleForTesting
-    static class CsvLineParser {
+    static class CsvLineParser implements LineParser<CSVRecord> {
         private final CSVFormat csvFormat;
 
         CsvLineParser(char fieldDelimiter, char quote, char escape) {
@@ -286,6 +99,7 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,CsvTableRowkey
                     .withQuote(quote);
         }
 
+        @Override
         public CSVRecord parse(String input) throws IOException {
             // TODO Creating a new parser for each line seems terribly inefficient but
             // there's no public way to parse single lines via commons-csv. We should update
@@ -294,18 +108,4 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,CsvTableRowkey
             return Iterables.getFirst(csvParser, null);
         }
     }
-
-    /**
-     * A default implementation of {@code ImportPreUpsertKeyValueProcessor} that is used if no
-     * specific class is configured. This implementation simply passes through the KeyValue
-     * list that is passed in.
-     */
-    public static class DefaultImportPreUpsertKeyValueProcessor implements
-            ImportPreUpsertKeyValueProcessor {
-
-        @Override
-        public List<KeyValue> preUpsert(byte[] rowKey, List<KeyValue> keyValues) {
-            return keyValues;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueReducer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueReducer.java
deleted file mode 100644
index 7e9c4fd..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueReducer.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.mapreduce;
-
-import java.io.IOException;
-import java.util.TreeSet;
-
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.phoenix.mapreduce.bulkload.CsvTableRowkeyPair;
-
-/**
- * Reducer class for the CSVBulkLoad job. 
- * Performs similar functionality to {@link KeyValueSortReducer}
- * 
- */
-public class CsvToKeyValueReducer extends Reducer<CsvTableRowkeyPair,KeyValue,CsvTableRowkeyPair,KeyValue> {
-    
-    @Override
-    protected void reduce(CsvTableRowkeyPair key, Iterable<KeyValue> values,
-            Reducer<CsvTableRowkeyPair, KeyValue, CsvTableRowkeyPair, KeyValue>.Context context)
-                    throws IOException, InterruptedException {
-        TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
-        for (KeyValue kv: values) {
-          try {
-            map.add(kv.clone());
-          } catch (CloneNotSupportedException e) {
-            throw new java.io.IOException(e);
-          }
-        }
-        context.setStatus("Read " + map.getClass());
-        int index = 0;
-        for (KeyValue kv: map) {
-          context.write(key, kv);
-          if (++index % 100 == 0) context.setStatus("Wrote " + index);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java
new file mode 100644
index 0000000..b2e99e5
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
+import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.UpsertExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+/**
+ * Base class for converting some input source format into {@link KeyValue}s of a target
+ * schema. Assumes input format is text-based, with one row per line. Depends on an online cluster
+ * to retrieve {@link ColumnInfo} from the target table.
+ */
+public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable, Text, TableRowkeyPair,
+        KeyValue> {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(FormatToKeyValueMapper.class);
+
+    protected static final String COUNTER_GROUP_NAME = "Phoenix MapReduce Import";
+
+    /** Configuration key for the name of the output table */
+    public static final String TABLE_NAME_CONFKEY = "phoenix.mapreduce.import.tablename";
+
+    /** Configuration key for the name of the output index table */
+    public static final String INDEX_TABLE_NAME_CONFKEY = "phoenix.mapreduce.import.indextablename";
+
+    /** Configuration key for the columns to be imported */
+    public static final String COLUMN_INFO_CONFKEY = "phoenix.mapreduce.import.columninfos";
+
+    /** Configuration key for the flag to ignore invalid rows */
+    public static final String IGNORE_INVALID_ROW_CONFKEY = "phoenix.mapreduce.import.ignoreinvalidrow";
+
+    /** Configuration key for the table names */
+    public static final String TABLE_NAMES_CONFKEY = "phoenix.mapreduce.import.tablenames";
+
+    /** Configuration key for the table configurations */
+    public static final String TABLE_CONFIG_CONFKEY = "phoenix.mapreduce.import.table.config";
+
+    /**
+     * Parses a single input line, returning a {@code T}.
+     */
+    public interface LineParser<T> {
+        T parse(String input) throws IOException;
+    }
+
+    protected PhoenixConnection conn;
+    protected UpsertExecutor<RECORD, ?> upsertExecutor;
+    protected ImportPreUpsertKeyValueProcessor preUpdateProcessor;
+    protected List<String> tableNames;
+    protected MapperUpsertListener<RECORD> upsertListener;
+
+    protected abstract UpsertExecutor<RECORD,?> buildUpsertExecutor(Configuration conf);
+    protected abstract LineParser<RECORD> getLineParser();
+
+    @Override
+    protected void setup(Context context) throws IOException, InterruptedException {
+
+        Configuration conf = context.getConfiguration();
+
+        // pass client configuration into driver
+        Properties clientInfos = new Properties();
+        for (Map.Entry<String, String> entry : conf) {
+            clientInfos.setProperty(entry.getKey(), entry.getValue());
+        }
+
+        try {
+            conn = (PhoenixConnection) QueryUtil.getConnection(clientInfos, conf);
+        } catch (SQLException | ClassNotFoundException e) {
+            throw new RuntimeException(e);
+        }
+
+        final String tableNamesConf = conf.get(TABLE_NAMES_CONFKEY);
+        tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
+
+        upsertListener = new MapperUpsertListener<RECORD>(
+                context, conf.getBoolean(IGNORE_INVALID_ROW_CONFKEY, true));
+        upsertExecutor = buildUpsertExecutor(conf);
+        preUpdateProcessor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+        try {
+            RECORD record = null;
+            try {
+                record = getLineParser().parse(value.toString());
+            } catch (IOException e) {
+                context.getCounter(COUNTER_GROUP_NAME, "Parser errors").increment(1L);
+                return;
+            }
+
+            if (record == null) {
+                context.getCounter(COUNTER_GROUP_NAME, "Empty records").increment(1L);
+                return;
+            }
+            upsertExecutor.execute(ImmutableList.<RECORD>of(record));
+
+            Iterator<Pair<byte[], List<KeyValue>>> uncommittedDataIterator
+                    = PhoenixRuntime.getUncommittedDataIterator(conn, true);
+            while (uncommittedDataIterator.hasNext()) {
+                Pair<byte[], List<KeyValue>> kvPair = uncommittedDataIterator.next();
+                List<KeyValue> keyValueList = kvPair.getSecond();
+                keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), keyValueList);
+                byte[] first = kvPair.getFirst();
+                for (String tableName : tableNames) {
+                    if (Bytes.compareTo(Bytes.toBytes(tableName), first) != 0) {
+                        // skip edits for other tables
+                        continue;
+                    }
+                    for (KeyValue kv : keyValueList) {
+                        ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
+                        outputKey.set(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength());
+                        context.write(new TableRowkeyPair(tableName, outputKey), kv);
+                    }
+                }
+            }
+            conn.rollback();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException, InterruptedException {
+        try {
+            conn.close();
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Write the list of to-import columns to a job configuration.
+     *
+     * @param conf configuration to be written to
+     * @param columnInfoList list of ColumnInfo objects to be configured for import
+     */
+    @VisibleForTesting
+    static void configureColumnInfoList(Configuration conf, List<ColumnInfo> columnInfoList) {
+        conf.set(COLUMN_INFO_CONFKEY, Joiner.on("|").useForNull("").join(columnInfoList));
+    }
+
+    /**
+     * Build the list of ColumnInfos for the import based on information in the configuration.
+     */
+    @VisibleForTesting
+    static List<ColumnInfo> buildColumnInfoList(Configuration conf) {
+
+        return Lists.newArrayList(
+                Iterables.transform(
+                        Splitter.on("|").split(conf.get(COLUMN_INFO_CONFKEY)),
+                        new Function<String, ColumnInfo>() {
+                            @Nullable
+                            @Override
+                            public ColumnInfo apply(@Nullable String input) {
+                                if (input == null || input.isEmpty()) {
+                                    // An empty string represents a null that was passed in to
+                                    // the configuration, which corresponds to an input column
+                                    // which is to be skipped
+                                    return null;
+                                }
+                                return ColumnInfo.fromString(input);
+                            }
+                        }));
+    }
+
+    /**
+     * Listener that logs successful upserts and errors to job counters.
+     */
+    @VisibleForTesting
+    static class MapperUpsertListener<T> implements UpsertExecutor.UpsertListener<T> {
+
+        private final Mapper<LongWritable, Text, TableRowkeyPair, KeyValue>.Context context;
+        private final boolean ignoreRecordErrors;
+
+        private MapperUpsertListener(
+                Mapper<LongWritable, Text, TableRowkeyPair, KeyValue>.Context context,
+                boolean ignoreRecordErrors) {
+            this.context = context;
+            this.ignoreRecordErrors = ignoreRecordErrors;
+        }
+
+        @Override
+        public void upsertDone(long upsertCount) {
+            context.getCounter(COUNTER_GROUP_NAME, "Upserts Done").increment(1L);
+        }
+
+        @Override
+        public void errorOnRecord(T record, Throwable throwable) {
+            LOG.error("Error on record " + record, throwable);
+            context.getCounter(COUNTER_GROUP_NAME, "Errors on records").increment(1L);
+            if (!ignoreRecordErrors) {
+                throw Throwables.propagate(throwable);
+            }
+        }
+    }
+
+    /**
+     * A default implementation of {@code ImportPreUpsertKeyValueProcessor} that is used if no
+     * specific class is configured. This implementation simply passes through the KeyValue
+     * list that is passed in.
+     */
+    public static class DefaultImportPreUpsertKeyValueProcessor implements
+            ImportPreUpsertKeyValueProcessor {
+
+        @Override
+        public List<KeyValue> preUpsert(byte[] rowKey, List<KeyValue> keyValues) {
+            return keyValues;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
new file mode 100644
index 0000000..5d00656
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
+
+/**
+ * Reducer class for the bulkload jobs.
+ * Performs similar functionality to {@link KeyValueSortReducer}
+ */
+public class FormatToKeyValueReducer
+    extends Reducer<TableRowkeyPair,KeyValue,TableRowkeyPair,KeyValue> {
+
+    @Override
+    protected void reduce(TableRowkeyPair key, Iterable<KeyValue> values,
+        Reducer<TableRowkeyPair, KeyValue, TableRowkeyPair, KeyValue>.Context context)
+        throws IOException, InterruptedException {
+        TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
+        for (KeyValue kv: values) {
+            try {
+                map.add(kv.clone());
+            } catch (CloneNotSupportedException e) {
+                throw new java.io.IOException(e);
+            }
+        }
+        context.setStatus("Read " + map.getClass());
+        int index = 0;
+        for (KeyValue kv: map) {
+            context.write(key, kv);
+            if (++index % 100 == 0) context.setStatus("Wrote " + index);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ImportPreUpsertKeyValueProcessor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ImportPreUpsertKeyValueProcessor.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ImportPreUpsertKeyValueProcessor.java
index 62211f3..dff9ef2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ImportPreUpsertKeyValueProcessor.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ImportPreUpsertKeyValueProcessor.java
@@ -24,7 +24,8 @@ import java.util.List;
 /**
  * A listener hook to process KeyValues that are being written to HFiles for bulk import.
  * Implementing this interface and configuring it via the {@link
- * CsvToKeyValueMapper#UPSERT_HOOK_CLASS_CONFKEY} configuration key.
+ * org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil#UPSERT_HOOK_CLASS_CONFKEY}
+ * configuration key.
  * <p/>
  * The intention of such a hook is to allow coproccessor-style operations to be peformed on
  * data that is being bulk-loaded via MapReduce.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonBulkLoadTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonBulkLoadTool.java
new file mode 100644
index 0000000..1bea3f0
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonBulkLoadTool.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.util.ColumnInfo;
+
+/**
+ * A tool for running MapReduce-based ingests of JSON data. Nested JSON data structures are not
+ * handled, though lists are converted into typed ARRAYS.
+ */
+public class JsonBulkLoadTool extends AbstractBulkLoadTool {
+
+    @Override
+    protected void configureOptions(CommandLine cmdLine, List<ColumnInfo> importColumns,
+                                         Configuration conf) throws SQLException {
+        // noop
+    }
+
+    @Override
+    protected void setupJob(Job job) {
+        // Allow overriding the job jar setting by using a -D system property at startup
+        if (job.getJar() == null) {
+            job.setJarByClass(JsonToKeyValueMapper.class);
+        }
+        job.setMapperClass(JsonToKeyValueMapper.class);
+    }
+
+    public static void main(String[] args) throws Exception {
+        ToolRunner.run(new JsonBulkLoadTool(), args);
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonToKeyValueMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonToKeyValueMapper.java
new file mode 100644
index 0000000..5173a0e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonToKeyValueMapper.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.UpsertExecutor;
+import org.apache.phoenix.util.json.JsonUpsertExecutor;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+/**
+ * MapReduce mapper that converts JSON input lines into KeyValues that can be written to HFiles.
+ * <p/>
+ * KeyValues are produced by executing UPSERT statements on a Phoenix connection and then
+ * extracting the created KeyValues and rolling back the statement execution before it is
+ * committed to HBase.
+ */
+public class JsonToKeyValueMapper extends FormatToKeyValueMapper<Map<?, ?>> {
+
+    private LineParser<Map<?, ?>> lineParser;
+
+    @Override
+    protected  LineParser<Map<?, ?>> getLineParser() {
+        return lineParser;
+    }
+
+    @Override
+    protected void setup(Context context) throws IOException, InterruptedException {
+        super.setup(context);
+        lineParser = new JsonLineParser();
+    }
+
+    @VisibleForTesting
+    @Override
+    protected UpsertExecutor<Map<?, ?>, ?> buildUpsertExecutor(Configuration conf) {
+        String tableName = conf.get(TABLE_NAME_CONFKEY);
+        Preconditions.checkNotNull(tableName, "table name is not configured");
+
+        List<ColumnInfo> columnInfoList = buildColumnInfoList(conf);
+
+        return new JsonUpsertExecutor(conn, tableName, columnInfoList, upsertListener);
+    }
+
+    @VisibleForTesting
+    static class JsonLineParser implements LineParser<Map<?, ?>> {
+        private final ObjectMapper mapper = new ObjectMapper();
+
+        @Override
+        public Map<?, ?> parse(String input) throws IOException {
+            return mapper.readValue(input, Map.class);
+        }
+    }
+}