You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by nd...@apache.org on 2015/12/08 00:02:07 UTC
[3/3] phoenix git commit: PHOENIX-2481 JSON bulkload tool
PHOENIX-2481 JSON bulkload tool
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/578979a1
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/578979a1
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/578979a1
Branch: refs/heads/master
Commit: 578979a1437124d53e319ad554b72793bcef6fd3
Parents: 8ce3b58
Author: Nick Dimiduk <nd...@apache.org>
Authored: Mon Nov 16 17:18:34 2015 -0800
Committer: Nick Dimiduk <nd...@apache.org>
Committed: Mon Dec 7 13:09:09 2015 -0800
----------------------------------------------------------------------
.../phoenix/mapreduce/CsvBulkLoadToolIT.java | 16 +-
.../phoenix/mapreduce/AbstractBulkLoadTool.java | 402 +++++++++++++++
.../phoenix/mapreduce/CsvBulkImportUtil.java | 20 +-
.../phoenix/mapreduce/CsvBulkLoadTool.java | 514 +------------------
.../phoenix/mapreduce/CsvToKeyValueMapper.java | 226 +-------
.../phoenix/mapreduce/CsvToKeyValueReducer.java | 55 --
.../mapreduce/FormatToKeyValueMapper.java | 259 ++++++++++
.../mapreduce/FormatToKeyValueReducer.java | 54 ++
.../ImportPreUpsertKeyValueProcessor.java | 3 +-
.../phoenix/mapreduce/JsonBulkLoadTool.java | 53 ++
.../phoenix/mapreduce/JsonToKeyValueMapper.java | 75 +++
.../mapreduce/MultiHfileOutputFormat.java | 38 +-
.../mapreduce/bulkload/CsvTableRowkeyPair.java | 139 -----
.../mapreduce/bulkload/TableRowkeyPair.java | 134 +++++
.../mapreduce/bulkload/TargetTableRef.java | 70 +++
.../bulkload/TargetTableRefFunctions.java | 95 ++++
.../util/PhoenixConfigurationUtil.java | 15 +-
.../apache/phoenix/util/CSVCommonsLoader.java | 160 +-----
.../org/apache/phoenix/util/SchemaUtil.java | 145 +++++-
.../org/apache/phoenix/util/UpsertExecutor.java | 156 ++++++
.../phoenix/util/csv/CsvUpsertExecutor.java | 131 +----
.../phoenix/util/json/JsonUpsertExecutor.java | 209 ++++++++
.../util/json/ObjectToArrayConverter.java | 69 +++
.../phoenix/mapreduce/BulkLoadToolTest.java | 78 +++
.../mapreduce/CsvBulkImportUtilTest.java | 18 +-
.../phoenix/mapreduce/CsvBulkLoadToolTest.java | 69 ---
.../mapreduce/CsvToKeyValueMapperTest.java | 84 +--
.../mapreduce/FormatToKeyValueMapperTest.java | 102 ++++
.../util/AbstractUpsertExecutorTest.java | 136 +++++
.../phoenix/util/csv/CsvUpsertExecutorTest.java | 144 ++----
.../util/json/JsonUpsertExecutorTest.java | 53 ++
31 files changed, 2222 insertions(+), 1500 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
index 0e74d7b..a5b7488 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/CsvBulkLoadToolIT.java
@@ -17,14 +17,6 @@
*/
package org.apache.phoenix.mapreduce;
-import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
-import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
import java.io.PrintWriter;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -47,6 +39,14 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
+import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
@Category(NeedsOwnMiniClusterTest.class)
public class CsvBulkLoadToolIT {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
new file mode 100644
index 0000000..cf9ddef
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
+import org.apache.phoenix.mapreduce.bulkload.TargetTableRef;
+import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.StringUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Base tool for running MapReduce-based ingests of data.
+ */
+public abstract class AbstractBulkLoadTool extends Configured implements Tool {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(AbstractBulkLoadTool.class);
+
+ static final Option ZK_QUORUM_OPT = new Option("z", "zookeeper", true, "Supply zookeeper connection details (optional)");
+ static final Option INPUT_PATH_OPT = new Option("i", "input", true, "Input path (mandatory)");
+ static final Option OUTPUT_PATH_OPT = new Option("o", "output", true, "Output path for temporary HFiles (optional)");
+ static final Option SCHEMA_NAME_OPT = new Option("s", "schema", true, "Phoenix schema name (optional)");
+ static final Option TABLE_NAME_OPT = new Option("t", "table", true, "Phoenix table name (mandatory)");
+ static final Option INDEX_TABLE_NAME_OPT = new Option("it", "index-table", true, "Phoenix index table name when just loading this particualar index table");
+ static final Option IMPORT_COLUMNS_OPT = new Option("c", "import-columns", true, "Comma-separated list of columns to be imported");
+ static final Option IGNORE_ERRORS_OPT = new Option("g", "ignore-errors", false, "Ignore input errors");
+ static final Option HELP_OPT = new Option("h", "help", false, "Show this help and quit");
+
+ /**
+ * Set configuration values based on parsed command line options.
+ *
+ * @param cmdLine supplied command line options
+ * @param importColumns descriptors of columns to be imported
+ * @param conf job configuration
+ */
+ protected abstract void configureOptions(CommandLine cmdLine, List<ColumnInfo> importColumns,
+ Configuration conf) throws SQLException;
+ protected abstract void setupJob(Job job);
+
+ protected Options getOptions() {
+ Options options = new Options();
+ options.addOption(INPUT_PATH_OPT);
+ options.addOption(TABLE_NAME_OPT);
+ options.addOption(INDEX_TABLE_NAME_OPT);
+ options.addOption(ZK_QUORUM_OPT);
+ options.addOption(OUTPUT_PATH_OPT);
+ options.addOption(SCHEMA_NAME_OPT);
+ options.addOption(IMPORT_COLUMNS_OPT);
+ options.addOption(IGNORE_ERRORS_OPT);
+ options.addOption(HELP_OPT);
+ return options;
+ }
+
+ /**
+ * Parses the commandline arguments, throws IllegalStateException if mandatory arguments are
+ * missing.
+ *
+ * @param args supplied command line arguments
+ * @return the parsed command line
+ */
+ protected CommandLine parseOptions(String[] args) {
+
+ Options options = getOptions();
+
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmdLine = null;
+ try {
+ cmdLine = parser.parse(options, args);
+ } catch (ParseException e) {
+ printHelpAndExit("Error parsing command line options: " + e.getMessage(), options);
+ }
+
+ if (cmdLine.hasOption(HELP_OPT.getOpt())) {
+ printHelpAndExit(options, 0);
+ }
+
+ if (!cmdLine.hasOption(TABLE_NAME_OPT.getOpt())) {
+ throw new IllegalStateException(TABLE_NAME_OPT.getLongOpt() + " is a mandatory " +
+ "parameter");
+ }
+
+ if (!cmdLine.getArgList().isEmpty()) {
+ throw new IllegalStateException("Got unexpected extra parameters: "
+ + cmdLine.getArgList());
+ }
+
+ if (!cmdLine.hasOption(INPUT_PATH_OPT.getOpt())) {
+ throw new IllegalStateException(INPUT_PATH_OPT.getLongOpt() + " is a mandatory " +
+ "parameter");
+ }
+
+ return cmdLine;
+ }
+
+ private void printHelpAndExit(String errorMessage, Options options) {
+ System.err.println(errorMessage);
+ printHelpAndExit(options, 1);
+ }
+
+ private void printHelpAndExit(Options options, int exitCode) {
+ HelpFormatter formatter = new HelpFormatter();
+ formatter.printHelp("help", options);
+ System.exit(exitCode);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+
+ Configuration conf = HBaseConfiguration.create(getConf());
+
+ CommandLine cmdLine = null;
+ try {
+ cmdLine = parseOptions(args);
+ } catch (IllegalStateException e) {
+ printHelpAndExit(e.getMessage(), getOptions());
+ }
+ return loadData(conf, cmdLine);
+ }
+
+ private int loadData(Configuration conf, CommandLine cmdLine) throws SQLException,
+ InterruptedException, ExecutionException, ClassNotFoundException {
+ String tableName = cmdLine.getOptionValue(TABLE_NAME_OPT.getOpt());
+ String schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPT.getOpt());
+ String indexTableName = cmdLine.getOptionValue(INDEX_TABLE_NAME_OPT.getOpt());
+ String qualifiedTableName = getQualifiedTableName(schemaName, tableName);
+ String qualifiedIndexTableName = null;
+ if (indexTableName != null){
+ qualifiedIndexTableName = getQualifiedTableName(schemaName, indexTableName);
+ }
+
+ if (cmdLine.hasOption(ZK_QUORUM_OPT.getOpt())) {
+ // ZK_QUORUM_OPT is optional, but if it's there, use it for both the conn and the job.
+ String zkQuorum = cmdLine.getOptionValue(ZK_QUORUM_OPT.getOpt());
+ PhoenixDriver.ConnectionInfo info = PhoenixDriver.ConnectionInfo.create(zkQuorum);
+ LOG.info("Configuring HBase connection to {}", info);
+ for (Map.Entry<String,String> entry : info.asProps()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Setting {} = {}", entry.getKey(), entry.getValue());
+ }
+ conf.set(entry.getKey(), entry.getValue());
+ }
+ }
+
+ final Connection conn = QueryUtil.getConnection(conf);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Reading columns from {} :: {}", ((PhoenixConnection) conn).getURL(),
+ qualifiedTableName);
+ }
+ List<ColumnInfo> importColumns = buildImportColumns(conn, cmdLine, qualifiedTableName);
+ Preconditions.checkNotNull(importColumns);
+ Preconditions.checkArgument(!importColumns.isEmpty(), "Column info list is empty");
+ FormatToKeyValueMapper.configureColumnInfoList(conf, importColumns);
+ boolean ignoreInvalidRows = cmdLine.hasOption(IGNORE_ERRORS_OPT.getOpt());
+ conf.setBoolean(FormatToKeyValueMapper.IGNORE_INVALID_ROW_CONFKEY, ignoreInvalidRows);
+ conf.set(FormatToKeyValueMapper.TABLE_NAME_CONFKEY, tableName);
+
+ // give subclasses their hook
+ configureOptions(cmdLine, importColumns, conf);
+ try {
+ validateTable(conn, schemaName, tableName);
+ } finally {
+ conn.close();
+ }
+
+ final Path inputPath = new Path(cmdLine.getOptionValue(INPUT_PATH_OPT.getOpt()));
+ final Path outputPath;
+ if (cmdLine.hasOption(OUTPUT_PATH_OPT.getOpt())) {
+ outputPath = new Path(cmdLine.getOptionValue(OUTPUT_PATH_OPT.getOpt()));
+ } else {
+ outputPath = new Path("/tmp/" + UUID.randomUUID());
+ }
+
+ List<TargetTableRef> tablesToBeLoaded = new ArrayList<TargetTableRef>();
+ tablesToBeLoaded.add(new TargetTableRef(qualifiedTableName));
+ // using conn after it's been closed... o.O
+ tablesToBeLoaded.addAll(getIndexTables(conn, schemaName, qualifiedTableName));
+
+ // When loading a single index table, check index table name is correct
+ if (qualifiedIndexTableName != null){
+ TargetTableRef targetIndexRef = null;
+ for (TargetTableRef tmpTable : tablesToBeLoaded){
+ if (tmpTable.getLogicalName().compareToIgnoreCase(qualifiedIndexTableName) == 0) {
+ targetIndexRef = tmpTable;
+ break;
+ }
+ }
+ if (targetIndexRef == null){
+ throw new IllegalStateException("Bulk Loader error: index table " +
+ qualifiedIndexTableName + " doesn't exist");
+ }
+ tablesToBeLoaded.clear();
+ tablesToBeLoaded.add(targetIndexRef);
+ }
+
+ return submitJob(conf, tableName, inputPath, outputPath, tablesToBeLoaded);
+ }
+
+ /**
+ * Submits the jobs to the cluster.
+ * Loads the HFiles onto the respective tables.
+ */
+ public int submitJob(final Configuration conf, final String qualifiedTableName, final Path inputPath,
+ final Path outputPath , List<TargetTableRef> tablesToBeLoaded) {
+ try {
+ Job job = new Job(conf, "Phoenix MapReduce import for " + qualifiedTableName);
+ FileInputFormat.addInputPath(job, inputPath);
+ FileOutputFormat.setOutputPath(job, outputPath);
+
+ job.setInputFormatClass(TextInputFormat.class);
+ job.setMapOutputKeyClass(TableRowkeyPair.class);
+ job.setMapOutputValueClass(KeyValue.class);
+ job.setOutputKeyClass(TableRowkeyPair.class);
+ job.setOutputValueClass(KeyValue.class);
+ job.setReducerClass(FormatToKeyValueReducer.class);
+
+ MultiHfileOutputFormat.configureIncrementalLoad(job, tablesToBeLoaded);
+
+ final String tableNamesAsJson = TargetTableRefFunctions.NAMES_TO_JSON.apply(tablesToBeLoaded);
+ job.getConfiguration().set(FormatToKeyValueMapper.TABLE_NAMES_CONFKEY,tableNamesAsJson);
+
+ // give subclasses their hook
+ setupJob(job);
+
+ LOG.info("Running MapReduce import job from {} to {}", inputPath, outputPath);
+ boolean success = job.waitForCompletion(true);
+
+ if (success) {
+ LOG.info("Loading HFiles from {}", outputPath);
+ completebulkload(conf,outputPath,tablesToBeLoaded);
+ }
+
+ LOG.info("Removing output directory {}", outputPath);
+ if (!FileSystem.get(conf).delete(outputPath, true)) {
+ LOG.error("Removing output directory {} failed", outputPath);
+ }
+ return 0;
+ } catch(Exception e) {
+ LOG.error("Error {} occurred submitting BulkLoad ",e.getMessage());
+ return -1;
+ }
+
+ }
+
+ private void completebulkload(Configuration conf,Path outputPath , List<TargetTableRef> tablesToBeLoaded) throws Exception {
+ for(TargetTableRef table : tablesToBeLoaded) {
+ LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
+ String tableName = table.getPhysicalName();
+ Path tableOutputPath = new Path(outputPath,tableName);
+ HTable htable = new HTable(conf,tableName);
+ LOG.info("Loading HFiles for {} from {}", tableName , tableOutputPath);
+ loader.doBulkLoad(tableOutputPath, htable);
+ LOG.info("Incremental load complete for table=" + tableName);
+ }
+ }
+
+ /**
+ * Build up the list of columns to be imported. The list is taken from the command line if
+ * present, otherwise it is taken from the table description.
+ *
+ * @param conn connection to Phoenix
+ * @param cmdLine supplied command line options
+ * @param qualifiedTableName table name (possibly with schema) of the table to be imported
+ * @return the list of columns to be imported
+ */
+ List<ColumnInfo> buildImportColumns(Connection conn, CommandLine cmdLine,
+ String qualifiedTableName) throws SQLException {
+ List<String> userSuppliedColumnNames = null;
+ if (cmdLine.hasOption(IMPORT_COLUMNS_OPT.getOpt())) {
+ userSuppliedColumnNames = Lists.newArrayList(
+ Splitter.on(",").trimResults().split
+ (cmdLine.getOptionValue(IMPORT_COLUMNS_OPT.getOpt())));
+ }
+ return SchemaUtil.generateColumnInfo(
+ conn, qualifiedTableName, userSuppliedColumnNames, true);
+ }
+
+ /**
+ * Calculate the HBase HTable name for which the import is to be done.
+ *
+ * @param schemaName import schema name, can be null
+ * @param tableName import table name
+ * @return the byte representation of the import HTable
+ */
+ @VisibleForTesting
+ static String getQualifiedTableName(String schemaName, String tableName) {
+ if (schemaName != null) {
+ return String.format("%s.%s", SchemaUtil.normalizeIdentifier(schemaName),
+ SchemaUtil.normalizeIdentifier(tableName));
+ } else {
+ return SchemaUtil.normalizeIdentifier(tableName);
+ }
+ }
+
+ /**
+ * Perform any required validation on the table being bulk loaded into:
+ * - ensure no column family names start with '_', as they'd be ignored leading to problems.
+ * @throws java.sql.SQLException
+ */
+ private void validateTable(Connection conn, String schemaName,
+ String tableName) throws SQLException {
+
+ ResultSet rs = conn.getMetaData().getColumns(
+ null, StringUtil.escapeLike(schemaName),
+ StringUtil.escapeLike(tableName), null);
+ while (rs.next()) {
+ String familyName = rs.getString(PhoenixDatabaseMetaData.COLUMN_FAMILY);
+ if (familyName != null && familyName.startsWith("_")) {
+ if (QueryConstants.DEFAULT_COLUMN_FAMILY.equals(familyName)) {
+ throw new IllegalStateException(
+ "Bulk Loader error: All column names that are not part of the " +
+ "primary key constraint must be prefixed with a column family " +
+ "name (i.e. f.my_column VARCHAR)");
+ } else {
+ throw new IllegalStateException("Bulk Loader error: Column family name " +
+ "must not start with '_': " + familyName);
+ }
+ }
+ }
+ rs.close();
+ }
+
+ /**
+ * Get the index tables of current data table
+ * @throws java.sql.SQLException
+ */
+ private List<TargetTableRef> getIndexTables(Connection conn, String schemaName, String qualifiedTableName)
+ throws SQLException {
+ PTable table = PhoenixRuntime.getTable(conn, qualifiedTableName);
+ List<TargetTableRef> indexTables = new ArrayList<TargetTableRef>();
+ for(PTable indexTable : table.getIndexes()){
+ if (indexTable.getIndexType() == PTable.IndexType.LOCAL) {
+ throw new UnsupportedOperationException("Local indexes not supported by Bulk Loader");
+ /*indexTables.add(
+ new TargetTableRef(getQualifiedTableName(schemaName,
+ indexTable.getTableName().getString()),
+ MetaDataUtil.getLocalIndexTableName(qualifiedTableName))); */
+ } else {
+ indexTables.add(new TargetTableRef(getQualifiedTableName(schemaName,
+ indexTable.getTableName().getString())));
+ }
+ }
+ return indexTables;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
index 6d77cd5..bdc67f9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
@@ -17,15 +17,11 @@
*/
package org.apache.phoenix.mapreduce;
-import java.util.List;
-
-import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
-import org.apache.phoenix.util.ColumnInfo;
-import com.google.common.base.Preconditions;
+import com.google.common.annotations.VisibleForTesting;
/**
* Collection of utility methods for setting up bulk import jobs.
@@ -36,29 +32,19 @@ public class CsvBulkImportUtil {
* Configure a job configuration for a bulk CSV import.
*
* @param conf job configuration to be set up
- * @param tableName name of the table to be imported to, can include a schema name
* @param fieldDelimiter field delimiter character for the CSV input
* @param quoteChar quote character for the CSV input
* @param escapeChar escape character for the CSV input
* @param arrayDelimiter array delimiter character, can be null
- * @param columnInfoList list of columns to be imported
- * @param ignoreInvalidRows flag to ignore invalid input rows
*/
- public static void initCsvImportJob(Configuration conf, String tableName, char fieldDelimiter, char quoteChar, char escapeChar,
- String arrayDelimiter, List<ColumnInfo> columnInfoList, boolean ignoreInvalidRows) {
-
- Preconditions.checkNotNull(tableName);
- Preconditions.checkNotNull(columnInfoList);
- Preconditions.checkArgument(!columnInfoList.isEmpty(), "Column info list is empty");
- conf.set(CsvToKeyValueMapper.TABLE_NAME_CONFKEY, tableName);
+ public static void initCsvImportJob(Configuration conf, char fieldDelimiter, char quoteChar,
+ char escapeChar, String arrayDelimiter) {
setChar(conf, CsvToKeyValueMapper.FIELD_DELIMITER_CONFKEY, fieldDelimiter);
setChar(conf, CsvToKeyValueMapper.QUOTE_CHAR_CONFKEY, quoteChar);
setChar(conf, CsvToKeyValueMapper.ESCAPE_CHAR_CONFKEY, escapeChar);
if (arrayDelimiter != null) {
conf.set(CsvToKeyValueMapper.ARRAY_DELIMITER_CONFKEY, arrayDelimiter);
}
- CsvToKeyValueMapper.configureColumnInfoList(conf, columnInfoList);
- conf.setBoolean(CsvToKeyValueMapper.IGNORE_INVALID_ROW_CONFKEY, ignoreInvalidRows);
}
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
index 20f05ff..e0b083e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
@@ -17,364 +17,37 @@
*/
package org.apache.phoenix.mapreduce;
-import java.io.IOException;
-import java.sql.Connection;
-import java.sql.ResultSet;
import java.sql.SQLException;
-import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ExecutionException;
import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.cli.PosixParser;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.jdbc.PhoenixDriver;
-import org.apache.phoenix.mapreduce.bulkload.CsvTableRowkeyPair;
-import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.schema.PTable.IndexType;
-import org.apache.phoenix.util.CSVCommonsLoader;
import org.apache.phoenix.util.ColumnInfo;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.QueryUtil;
-import org.apache.phoenix.util.SchemaUtil;
-import org.apache.phoenix.util.StringUtil;
-import org.codehaus.jackson.annotate.JsonCreator;
-import org.codehaus.jackson.annotate.JsonProperty;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+public class CsvBulkLoadTool extends AbstractBulkLoadTool {
-/**
- * Base tool for running MapReduce-based ingests of data.
- */
-@SuppressWarnings("deprecation")
-public class CsvBulkLoadTool extends Configured implements Tool {
-
- private static final Logger LOG = LoggerFactory.getLogger(CsvBulkLoadTool.class);
-
- static final Option ZK_QUORUM_OPT = new Option("z", "zookeeper", true, "Supply zookeeper connection details (optional)");
- static final Option INPUT_PATH_OPT = new Option("i", "input", true, "Input CSV path (mandatory)");
- static final Option OUTPUT_PATH_OPT = new Option("o", "output", true, "Output path for temporary HFiles (optional)");
- static final Option SCHEMA_NAME_OPT = new Option("s", "schema", true, "Phoenix schema name (optional)");
- static final Option TABLE_NAME_OPT = new Option("t", "table", true, "Phoenix table name (mandatory)");
- static final Option INDEX_TABLE_NAME_OPT = new Option("it", "index-table", true, "Phoenix index table name when just loading this particualar index table");
static final Option DELIMITER_OPT = new Option("d", "delimiter", true, "Input delimiter, defaults to comma");
static final Option QUOTE_OPT = new Option("q", "quote", true, "Supply a custom phrase delimiter, defaults to double quote character");
static final Option ESCAPE_OPT = new Option("e", "escape", true, "Supply a custom escape character, default is a backslash");
static final Option ARRAY_DELIMITER_OPT = new Option("a", "array-delimiter", true, "Array element delimiter (optional)");
- static final Option IMPORT_COLUMNS_OPT = new Option("c", "import-columns", true, "Comma-separated list of columns to be imported");
- static final Option IGNORE_ERRORS_OPT = new Option("g", "ignore-errors", false, "Ignore input errors");
- static final Option HELP_OPT = new Option("h", "help", false, "Show this help and quit");
-
- public static void main(String[] args) throws Exception {
- int exitStatus = ToolRunner.run(new CsvBulkLoadTool(), args);
- System.exit(exitStatus);
- }
-
- /**
- * Parses the commandline arguments, throws IllegalStateException if mandatory arguments are
- * missing.
- *
- * @param args supplied command line arguments
- * @return the parsed command line
- */
- CommandLine parseOptions(String[] args) {
-
- Options options = getOptions();
-
- CommandLineParser parser = new PosixParser();
- CommandLine cmdLine = null;
- try {
- cmdLine = parser.parse(options, args);
- } catch (ParseException e) {
- printHelpAndExit("Error parsing command line options: " + e.getMessage(), options);
- }
-
- if (cmdLine.hasOption(HELP_OPT.getOpt())) {
- printHelpAndExit(options, 0);
- }
-
- if (!cmdLine.hasOption(TABLE_NAME_OPT.getOpt())) {
- throw new IllegalStateException(TABLE_NAME_OPT.getLongOpt() + " is a mandatory " +
- "parameter");
- }
-
- if (!cmdLine.getArgList().isEmpty()) {
- throw new IllegalStateException("Got unexpected extra parameters: "
- + cmdLine.getArgList());
- }
-
- if (!cmdLine.hasOption(INPUT_PATH_OPT.getOpt())) {
- throw new IllegalStateException(INPUT_PATH_OPT.getLongOpt() + " is a mandatory " +
- "parameter");
- }
- return cmdLine;
- }
-
- private Options getOptions() {
- Options options = new Options();
- options.addOption(INPUT_PATH_OPT);
- options.addOption(TABLE_NAME_OPT);
- options.addOption(INDEX_TABLE_NAME_OPT);
- options.addOption(ZK_QUORUM_OPT);
- options.addOption(OUTPUT_PATH_OPT);
- options.addOption(SCHEMA_NAME_OPT);
+ @Override
+ protected Options getOptions() {
+ Options options = super.getOptions();
options.addOption(DELIMITER_OPT);
options.addOption(QUOTE_OPT);
options.addOption(ESCAPE_OPT);
options.addOption(ARRAY_DELIMITER_OPT);
- options.addOption(IMPORT_COLUMNS_OPT);
- options.addOption(IGNORE_ERRORS_OPT);
- options.addOption(HELP_OPT);
return options;
}
-
- private void printHelpAndExit(String errorMessage, Options options) {
- System.err.println(errorMessage);
- printHelpAndExit(options, 1);
- }
-
- private void printHelpAndExit(Options options, int exitCode) {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp("help", options);
- System.exit(exitCode);
- }
-
@Override
- public int run(String[] args) throws Exception {
-
- Configuration conf = HBaseConfiguration.create(getConf());
-
- CommandLine cmdLine = null;
- try {
- cmdLine = parseOptions(args);
- } catch (IllegalStateException e) {
- printHelpAndExit(e.getMessage(), getOptions());
- }
- return loadData(conf, cmdLine);
- }
-
- private int loadData(Configuration conf, CommandLine cmdLine) throws SQLException,
- InterruptedException, ExecutionException, ClassNotFoundException {
- String tableName = cmdLine.getOptionValue(TABLE_NAME_OPT.getOpt());
- String schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPT.getOpt());
- String indexTableName = cmdLine.getOptionValue(INDEX_TABLE_NAME_OPT.getOpt());
- String qualifiedTableName = getQualifiedTableName(schemaName, tableName);
- String qualifiedIndexTableName = null;
- if (indexTableName != null){
- qualifiedIndexTableName = getQualifiedTableName(schemaName, indexTableName);
- }
-
- if (cmdLine.hasOption(ZK_QUORUM_OPT.getOpt())) {
- // ZK_QUORUM_OPT is optional, but if it's there, use it for both the conn and the job.
- String zkQuorum = cmdLine.getOptionValue(ZK_QUORUM_OPT.getOpt());
- PhoenixDriver.ConnectionInfo info = PhoenixDriver.ConnectionInfo.create(zkQuorum);
- LOG.info("Configuring HBase connection to {}", info);
- for (Map.Entry<String,String> entry : info.asProps()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Setting {} = {}", entry.getKey(), entry.getValue());
- }
- conf.set(entry.getKey(), entry.getValue());
- }
- }
-
- final Connection conn = QueryUtil.getConnection(conf);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Reading columns from {} :: {}", ((PhoenixConnection) conn).getURL(),
- qualifiedTableName);
- }
- List<ColumnInfo> importColumns = buildImportColumns(conn, cmdLine, qualifiedTableName);
- configureOptions(cmdLine, importColumns, conf);
- try {
- validateTable(conn, schemaName, tableName);
- } finally {
- conn.close();
- }
-
- final Path inputPath = new Path(cmdLine.getOptionValue(INPUT_PATH_OPT.getOpt()));
- final Path outputPath;
- if (cmdLine.hasOption(OUTPUT_PATH_OPT.getOpt())) {
- outputPath = new Path(cmdLine.getOptionValue(OUTPUT_PATH_OPT.getOpt()));
- } else {
- outputPath = new Path("/tmp/" + UUID.randomUUID());
- }
-
- List<TargetTableRef> tablesToBeLoaded = new ArrayList<TargetTableRef>();
- tablesToBeLoaded.add(new TargetTableRef(qualifiedTableName));
- // using conn after it's been closed... o.O
- tablesToBeLoaded.addAll(getIndexTables(conn, schemaName, qualifiedTableName));
-
- // When loading a single index table, check index table name is correct
- if (qualifiedIndexTableName != null){
- TargetTableRef targetIndexRef = null;
- for (TargetTableRef tmpTable : tablesToBeLoaded){
- if (tmpTable.getLogicalName().compareToIgnoreCase(qualifiedIndexTableName) == 0) {
- targetIndexRef = tmpTable;
- break;
- }
- }
- if (targetIndexRef == null){
- throw new IllegalStateException("CSV Bulk Loader error: index table " +
- qualifiedIndexTableName + " doesn't exist");
- }
- tablesToBeLoaded.clear();
- tablesToBeLoaded.add(targetIndexRef);
- }
-
- return submitJob(conf, tableName, inputPath, outputPath, tablesToBeLoaded);
- }
-
- /**
- * Submits the jobs to the cluster.
- * Loads the HFiles onto the respective tables.
- * @param configuration
- * @param qualifiedTableName
- * @param inputPath
- * @param outputPath
- * @param tablesToBeoaded
- * @return status
- */
- public int submitJob(final Configuration conf, final String qualifiedTableName, final Path inputPath,
- final Path outputPath , List<TargetTableRef> tablesToBeLoaded) {
- try {
- Job job = new Job(conf, "Phoenix MapReduce import for " + qualifiedTableName);
-
- // Allow overriding the job jar setting by using a -D system property at startup
- if (job.getJar() == null) {
- job.setJarByClass(CsvToKeyValueMapper.class);
- }
- job.setInputFormatClass(TextInputFormat.class);
- FileInputFormat.addInputPath(job, inputPath);
- FileOutputFormat.setOutputPath(job, outputPath);
- job.setMapperClass(CsvToKeyValueMapper.class);
- job.setMapOutputKeyClass(CsvTableRowkeyPair.class);
- job.setMapOutputValueClass(KeyValue.class);
- job.setOutputKeyClass(CsvTableRowkeyPair.class);
- job.setOutputValueClass(KeyValue.class);
- job.setReducerClass(CsvToKeyValueReducer.class);
-
- MultiHfileOutputFormat.configureIncrementalLoad(job, tablesToBeLoaded);
-
- final String tableNamesAsJson = TargetTableRefFunctions.NAMES_TO_JSON.apply(tablesToBeLoaded);
- job.getConfiguration().set(CsvToKeyValueMapper.TABLE_NAMES_CONFKEY,tableNamesAsJson);
-
- LOG.info("Running MapReduce import job from {} to {}", inputPath, outputPath);
- boolean success = job.waitForCompletion(true);
-
- if (success) {
- LOG.info("Loading HFiles from {}", outputPath);
- completebulkload(conf,outputPath,tablesToBeLoaded);
- }
-
- LOG.info("Removing output directory {}", outputPath);
- if (!FileSystem.get(conf).delete(outputPath, true)) {
- LOG.error("Removing output directory {} failed", outputPath);
- }
- return 0;
- } catch(Exception e) {
- LOG.error("Error {} occurred submitting CSVBulkLoad ",e.getMessage());
- return -1;
- }
-
- }
-
- /**
- * bulkload HFiles .
- * @param conf
- * @param outputPath
- * @param tablesToBeLoaded
- * @throws Exception
- */
- private void completebulkload(Configuration conf,Path outputPath , List<TargetTableRef> tablesToBeLoaded) throws Exception {
- for(TargetTableRef table : tablesToBeLoaded) {
- LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
- String tableName = table.getPhysicalName();
- Path tableOutputPath = new Path(outputPath,tableName);
- HTable htable = new HTable(conf,tableName);
- LOG.info("Loading HFiles for {} from {}", tableName , tableOutputPath);
- loader.doBulkLoad(tableOutputPath, htable);
- LOG.info("Incremental load complete for table=" + tableName);
- }
- }
-
- /**
- * Build up the list of columns to be imported. The list is taken from the command line if
- * present, otherwise it is taken from the table description.
- *
- * @param conn connection to Phoenix
- * @param cmdLine supplied command line options
- * @param qualifiedTableName table name (possibly with schema) of the table to be imported
- * @return the list of columns to be imported
- */
- List<ColumnInfo> buildImportColumns(Connection conn, CommandLine cmdLine,
- String qualifiedTableName) throws SQLException {
- List<String> userSuppliedColumnNames = null;
- if (cmdLine.hasOption(IMPORT_COLUMNS_OPT.getOpt())) {
- userSuppliedColumnNames = Lists.newArrayList(
- Splitter.on(",").trimResults().split
- (cmdLine.getOptionValue(IMPORT_COLUMNS_OPT.getOpt())));
- }
- return CSVCommonsLoader.generateColumnInfo(
- conn, qualifiedTableName, userSuppliedColumnNames, true);
- }
-
- /**
- * Calculate the HBase HTable name for which the import is to be done.
- *
- * @param schemaName import schema name, can be null
- * @param tableName import table name
- * @return the byte representation of the import HTable
- */
- @VisibleForTesting
- static String getQualifiedTableName(String schemaName, String tableName) {
- if (schemaName != null) {
- return String.format("%s.%s", SchemaUtil.normalizeIdentifier(schemaName),
- SchemaUtil.normalizeIdentifier(tableName));
- } else {
- return SchemaUtil.normalizeIdentifier(tableName);
- }
- }
-
- /**
- * Set configuration values based on parsed command line options.
- *
- * @param cmdLine supplied command line options
- * @param importColumns descriptors of columns to be imported
- * @param conf job configuration
- */
- private static void configureOptions(CommandLine cmdLine, List<ColumnInfo> importColumns,
- Configuration conf) throws SQLException {
+ protected void configureOptions(CommandLine cmdLine, List<ColumnInfo> importColumns,
+ Configuration conf) throws SQLException {
// we don't parse ZK_QUORUM_OPT here because we need it in order to
// create the connection we need to build importColumns.
@@ -408,178 +81,23 @@ public class CsvBulkLoadTool extends Configured implements Tool {
CsvBulkImportUtil.initCsvImportJob(
conf,
- getQualifiedTableName(
- cmdLine.getOptionValue(SCHEMA_NAME_OPT.getOpt()),
- cmdLine.getOptionValue(TABLE_NAME_OPT.getOpt())),
delimiterChar,
quoteChar,
escapeChar,
- cmdLine.getOptionValue(ARRAY_DELIMITER_OPT.getOpt()),
- importColumns,
- cmdLine.hasOption(IGNORE_ERRORS_OPT.getOpt()));
+ cmdLine.getOptionValue(ARRAY_DELIMITER_OPT.getOpt()));
}
- /**
- * Perform any required validation on the table being bulk loaded into:
- * - ensure no column family names start with '_', as they'd be ignored leading to problems.
- * @throws java.sql.SQLException
- */
- private void validateTable(Connection conn, String schemaName,
- String tableName) throws SQLException {
-
- ResultSet rs = conn.getMetaData().getColumns(
- null, StringUtil.escapeLike(schemaName),
- StringUtil.escapeLike(tableName), null);
- while (rs.next()) {
- String familyName = rs.getString(PhoenixDatabaseMetaData.COLUMN_FAMILY);
- if (familyName != null && familyName.startsWith("_")) {
- if (QueryConstants.DEFAULT_COLUMN_FAMILY.equals(familyName)) {
- throw new IllegalStateException(
- "CSV Bulk Loader error: All column names that are not part of the " +
- "primary key constraint must be prefixed with a column family " +
- "name (i.e. f.my_column VARCHAR)");
- } else {
- throw new IllegalStateException("CSV Bulk Loader error: Column family name " +
- "must not start with '_': " + familyName);
- }
- }
- }
- rs.close();
- }
-
- /**
- * Get the index tables of current data table
- * @throws java.sql.SQLException
- */
- private List<TargetTableRef> getIndexTables(Connection conn, String schemaName, String qualifiedTableName)
- throws SQLException {
- PTable table = PhoenixRuntime.getTable(conn, qualifiedTableName);
- List<TargetTableRef> indexTables = new ArrayList<TargetTableRef>();
- for(PTable indexTable : table.getIndexes()){
- if (indexTable.getIndexType() == IndexType.LOCAL) {
- throw new UnsupportedOperationException("Local indexes not supported by CSV Bulk Loader");
- /*indexTables.add(
- new TargetTableRef(getQualifiedTableName(schemaName,
- indexTable.getTableName().getString()),
- MetaDataUtil.getLocalIndexTableName(qualifiedTableName))); */
- } else {
- indexTables.add(new TargetTableRef(getQualifiedTableName(schemaName,
- indexTable.getTableName().getString())));
- }
+ @Override
+ protected void setupJob(Job job) {
+ // Allow overriding the job jar setting by using a -D system property at startup
+ if (job.getJar() == null) {
+ job.setJarByClass(CsvToKeyValueMapper.class);
}
- return indexTables;
+ job.setMapperClass(CsvToKeyValueMapper.class);
}
- /**
- * Represents the logical and physical name of a single table to which data is to be loaded.
- *
- * This class exists to allow for the difference between HBase physical table names and
- * Phoenix logical table names.
- */
- static class TargetTableRef {
-
- @JsonProperty
- private final String logicalName;
-
- @JsonProperty
- private final String physicalName;
-
- @JsonProperty
- private Map<String,String> configuration = Maps.newHashMap();
-
- private TargetTableRef(String name) {
- this(name, name);
- }
-
- @JsonCreator
- private TargetTableRef(@JsonProperty("logicalName") String logicalName, @JsonProperty("physicalName") String physicalName) {
- this.logicalName = logicalName;
- this.physicalName = physicalName;
- }
-
- public String getLogicalName() {
- return logicalName;
- }
-
- public String getPhysicalName() {
- return physicalName;
- }
-
- public Map<String, String> getConfiguration() {
- return configuration;
- }
-
- public void setConfiguration(Map<String, String> configuration) {
- this.configuration = configuration;
- }
+ public static void main(String[] args) throws Exception {
+ int exitStatus = ToolRunner.run(new CsvBulkLoadTool(), args);
+ System.exit(exitStatus);
}
-
- /**
- * Utility functions to get/put json.
- *
- */
- static class TargetTableRefFunctions {
-
- public static Function<TargetTableRef,String> TO_JSON = new Function<TargetTableRef,String>() {
-
- @Override
- public String apply(TargetTableRef input) {
- try {
- ObjectMapper mapper = new ObjectMapper();
- return mapper.writeValueAsString(input);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- }
- };
-
- public static Function<String,TargetTableRef> FROM_JSON = new Function<String,TargetTableRef>() {
-
- @Override
- public TargetTableRef apply(String json) {
- try {
- ObjectMapper mapper = new ObjectMapper();
- return mapper.readValue(json, TargetTableRef.class);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- }
- };
-
- public static Function<List<TargetTableRef>,String> NAMES_TO_JSON = new Function<List<TargetTableRef>,String>() {
-
- @Override
- public String apply(List<TargetTableRef> input) {
- try {
- List<String> tableNames = Lists.newArrayListWithCapacity(input.size());
- for(TargetTableRef table : input) {
- tableNames.add(table.getPhysicalName());
- }
- ObjectMapper mapper = new ObjectMapper();
- return mapper.writeValueAsString(tableNames);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- }
- };
-
- public static Function<String,List<String>> NAMES_FROM_JSON = new Function<String,List<String>>() {
-
- @SuppressWarnings("unchecked")
- @Override
- public List<String> apply(String json) {
- try {
- ObjectMapper mapper = new ObjectMapper();
- return mapper.readValue(json, ArrayList.class);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- }
- };
- }
-
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
index c3b5a7d..5a5d378 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
@@ -19,46 +19,20 @@ package org.apache.phoenix.mapreduce;
import java.io.IOException;
import java.io.StringReader;
-import java.sql.SQLException;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map.Entry;
-import java.util.Properties;
-
-import javax.annotation.Nullable;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.mapreduce.CsvBulkLoadTool.TargetTableRefFunctions;
-import org.apache.phoenix.mapreduce.bulkload.CsvTableRowkeyPair;
-import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.util.CSVCommonsLoader;
import org.apache.phoenix.util.ColumnInfo;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.UpsertExecutor;
import org.apache.phoenix.util.csv.CsvUpsertExecutor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
/**
* MapReduce mapper that converts CSV input lines into KeyValues that can be written to HFiles.
@@ -67,12 +41,7 @@ import com.google.common.collect.Lists;
* extracting the created KeyValues and rolling back the statement execution before it is
* committed to HBase.
*/
-public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,CsvTableRowkeyPair,
- KeyValue> {
-
- private static final Logger LOG = LoggerFactory.getLogger(CsvToKeyValueMapper.class);
-
- private static final String COUNTER_GROUP_NAME = "Phoenix MapReduce Import";
+public class CsvToKeyValueMapper extends FormatToKeyValueMapper<CSVRecord> {
/** Configuration key for the field delimiter for input csv records */
public static final String FIELD_DELIMITER_CONFKEY = "phoenix.mapreduce.import.fielddelimiter";
@@ -86,117 +55,25 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,CsvTableRowkey
/** Configuration key for the array element delimiter for input arrays */
public static final String ARRAY_DELIMITER_CONFKEY = "phoenix.mapreduce.import.arraydelimiter";
- /** Configuration key for the name of the output table */
- public static final String TABLE_NAME_CONFKEY = "phoenix.mapreduce.import.tablename";
-
- /** Configuration key for the name of the output index table */
- public static final String INDEX_TABLE_NAME_CONFKEY = "phoenix.mapreduce.import.indextablename";
-
- /** Configuration key for the columns to be imported */
- public static final String COLUMN_INFO_CONFKEY = "phoenix.mapreduce.import.columninfos";
+ private CsvLineParser lineParser;
- /** Configuration key for the flag to ignore invalid rows */
- public static final String IGNORE_INVALID_ROW_CONFKEY = "phoenix.mapreduce.import.ignoreinvalidrow";
-
- /** Configuration key for the table names */
- public static final String TABLE_NAMES_CONFKEY = "phoenix.mapreduce.import.tablenames";
-
- /** Configuration key for the table configurations */
- public static final String TABLE_CONFIG_CONFKEY = "phoenix.mapreduce.import.table.config";
-
- private PhoenixConnection conn;
- private CsvUpsertExecutor csvUpsertExecutor;
- private MapperUpsertListener upsertListener;
- private CsvLineParser csvLineParser;
- private ImportPreUpsertKeyValueProcessor preUpdateProcessor;
- private List<String> tableNames;
+ @Override
+ protected LineParser<CSVRecord> getLineParser() {
+ return lineParser;
+ }
@Override
protected void setup(Context context) throws IOException, InterruptedException {
-
Configuration conf = context.getConfiguration();
-
- // pass client configuration into driver
- Properties clientInfos = new Properties();
- Iterator<Entry<String, String>> iterator = conf.iterator();
- while(iterator.hasNext()) {
- Entry<String,String> entry = iterator.next();
- clientInfos.setProperty(entry.getKey(), entry.getValue());
- }
-
- try {
- conn = (PhoenixConnection) QueryUtil.getConnection(clientInfos, conf);
- } catch (SQLException | ClassNotFoundException e) {
- throw new RuntimeException(e);
- }
-
- final String tableNamesConf = conf.get(TABLE_NAMES_CONFKEY);
- tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
-
- upsertListener = new MapperUpsertListener(
- context, conf.getBoolean(IGNORE_INVALID_ROW_CONFKEY, true));
- csvUpsertExecutor = buildUpsertExecutor(conf);
- csvLineParser = new CsvLineParser(
+ lineParser = new CsvLineParser(
CsvBulkImportUtil.getCharacter(conf, FIELD_DELIMITER_CONFKEY),
CsvBulkImportUtil.getCharacter(conf, QUOTE_CHAR_CONFKEY),
CsvBulkImportUtil.getCharacter(conf, ESCAPE_CHAR_CONFKEY));
-
- preUpdateProcessor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
- }
-
- @SuppressWarnings("deprecation")
- @Override
- protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
- try {
- CSVRecord csvRecord = null;
- try {
- csvRecord = csvLineParser.parse(value.toString());
- } catch (IOException e) {
- context.getCounter(COUNTER_GROUP_NAME, "CSV Parser errors").increment(1L);
- }
-
- if (csvRecord == null) {
- context.getCounter(COUNTER_GROUP_NAME, "Empty records").increment(1L);
- return;
- }
- csvUpsertExecutor.execute(ImmutableList.of(csvRecord));
-
- Iterator<Pair<byte[], List<KeyValue>>> uncommittedDataIterator
- = PhoenixRuntime.getUncommittedDataIterator(conn, true);
- while (uncommittedDataIterator.hasNext()) {
- Pair<byte[], List<KeyValue>> kvPair = uncommittedDataIterator.next();
- List<KeyValue> keyValueList = kvPair.getSecond();
- keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), keyValueList);
- byte[] first = kvPair.getFirst();
- for(String tableName : tableNames) {
- if (Bytes.compareTo(Bytes.toBytes(tableName), first) != 0) {
- // skip edits for other tables
- continue;
- }
- for (KeyValue kv : keyValueList) {
- ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
- outputKey.set(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength());
- context.write(new CsvTableRowkeyPair(tableName, outputKey), kv);
- }
- }
- }
- conn.rollback();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- try {
- conn.close();
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
}
@VisibleForTesting
- CsvUpsertExecutor buildUpsertExecutor(Configuration conf) {
+ @Override
+ protected UpsertExecutor<CSVRecord, ?> buildUpsertExecutor(Configuration conf) {
String tableName = conf.get(TABLE_NAME_CONFKEY);
String arraySeparator = conf.get(ARRAY_DELIMITER_CONFKEY,
CSVCommonsLoader.DEFAULT_ARRAY_ELEMENT_SEPARATOR);
@@ -204,78 +81,14 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,CsvTableRowkey
List<ColumnInfo> columnInfoList = buildColumnInfoList(conf);
- return CsvUpsertExecutor.create(conn, tableName, columnInfoList, upsertListener, arraySeparator);
- }
-
- /**
- * Write the list of to-import columns to a job configuration.
- *
- * @param conf configuration to be written to
- * @param columnInfoList list of ColumnInfo objects to be configured for import
- */
- @VisibleForTesting
- static void configureColumnInfoList(Configuration conf, List<ColumnInfo> columnInfoList) {
- conf.set(COLUMN_INFO_CONFKEY, Joiner.on("|").useForNull("").join(columnInfoList));
- }
-
- /**
- * Build the list of ColumnInfos for the import based on information in the configuration.
- */
- @VisibleForTesting
- static List<ColumnInfo> buildColumnInfoList(Configuration conf) {
-
- return Lists.newArrayList(
- Iterables.transform(
- Splitter.on("|").split(conf.get(COLUMN_INFO_CONFKEY)),
- new Function<String, ColumnInfo>() {
- @Nullable
- @Override
- public ColumnInfo apply(@Nullable String input) {
- if (input.isEmpty()) {
- // An empty string represents a null that was passed in to
- // the configuration, which corresponds to an input column
- // which is to be skipped
- return null;
- }
- return ColumnInfo.fromString(input);
- }
- }));
- }
-
- /**
- * Listener that logs successful upserts and errors to job counters.
- */
- @VisibleForTesting
- static class MapperUpsertListener implements CsvUpsertExecutor.UpsertListener {
-
- private final Context context;
- private final boolean ignoreRecordErrors;
-
- private MapperUpsertListener(Context context, boolean ignoreRecordErrors) {
- this.context = context;
- this.ignoreRecordErrors = ignoreRecordErrors;
- }
-
- @Override
- public void upsertDone(long upsertCount) {
- context.getCounter(COUNTER_GROUP_NAME, "Upserts Done").increment(1L);
- }
-
- @Override
- public void errorOnRecord(CSVRecord csvRecord, Throwable throwable) {
- LOG.error("Error on record " + csvRecord, throwable);
- context.getCounter(COUNTER_GROUP_NAME, "Errors on records").increment(1L);
- if (!ignoreRecordErrors) {
- throw Throwables.propagate(throwable);
- }
- }
+ return new CsvUpsertExecutor(conn, tableName, columnInfoList, upsertListener, arraySeparator);
}
/**
* Parses a single CSV input line, returning a {@code CSVRecord}.
*/
@VisibleForTesting
- static class CsvLineParser {
+ static class CsvLineParser implements LineParser<CSVRecord> {
private final CSVFormat csvFormat;
CsvLineParser(char fieldDelimiter, char quote, char escape) {
@@ -286,6 +99,7 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,CsvTableRowkey
.withQuote(quote);
}
+ @Override
public CSVRecord parse(String input) throws IOException {
// TODO Creating a new parser for each line seems terribly inefficient but
// there's no public way to parse single lines via commons-csv. We should update
@@ -294,18 +108,4 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,CsvTableRowkey
return Iterables.getFirst(csvParser, null);
}
}
-
- /**
- * A default implementation of {@code ImportPreUpsertKeyValueProcessor} that is used if no
- * specific class is configured. This implementation simply passes through the KeyValue
- * list that is passed in.
- */
- public static class DefaultImportPreUpsertKeyValueProcessor implements
- ImportPreUpsertKeyValueProcessor {
-
- @Override
- public List<KeyValue> preUpsert(byte[] rowKey, List<KeyValue> keyValues) {
- return keyValues;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueReducer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueReducer.java
deleted file mode 100644
index 7e9c4fd..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueReducer.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.mapreduce;
-
-import java.io.IOException;
-import java.util.TreeSet;
-
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.phoenix.mapreduce.bulkload.CsvTableRowkeyPair;
-
-/**
- * Reducer class for the CSVBulkLoad job.
- * Performs similar functionality to {@link KeyValueSortReducer}
- *
- */
-public class CsvToKeyValueReducer extends Reducer<CsvTableRowkeyPair,KeyValue,CsvTableRowkeyPair,KeyValue> {
-
- @Override
- protected void reduce(CsvTableRowkeyPair key, Iterable<KeyValue> values,
- Reducer<CsvTableRowkeyPair, KeyValue, CsvTableRowkeyPair, KeyValue>.Context context)
- throws IOException, InterruptedException {
- TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
- for (KeyValue kv: values) {
- try {
- map.add(kv.clone());
- } catch (CloneNotSupportedException e) {
- throw new java.io.IOException(e);
- }
- }
- context.setStatus("Read " + map.getClass());
- int index = 0;
- for (KeyValue kv: map) {
- context.write(key, kv);
- if (++index % 100 == 0) context.setStatus("Wrote " + index);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java
new file mode 100644
index 0000000..b2e99e5
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
+import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.UpsertExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+/**
+ * Base class for converting some input source format into {@link KeyValue}s of a target
+ * schema. Assumes input format is text-based, with one row per line. Depends on an online cluster
+ * to retrieve {@link ColumnInfo} from the target table.
+ */
+public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable, Text, TableRowkeyPair,
+ KeyValue> {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(FormatToKeyValueMapper.class);
+
+ protected static final String COUNTER_GROUP_NAME = "Phoenix MapReduce Import";
+
+ /** Configuration key for the name of the output table */
+ public static final String TABLE_NAME_CONFKEY = "phoenix.mapreduce.import.tablename";
+
+ /** Configuration key for the name of the output index table */
+ public static final String INDEX_TABLE_NAME_CONFKEY = "phoenix.mapreduce.import.indextablename";
+
+ /** Configuration key for the columns to be imported */
+ public static final String COLUMN_INFO_CONFKEY = "phoenix.mapreduce.import.columninfos";
+
+ /** Configuration key for the flag to ignore invalid rows */
+ public static final String IGNORE_INVALID_ROW_CONFKEY = "phoenix.mapreduce.import.ignoreinvalidrow";
+
+ /** Configuration key for the table names */
+ public static final String TABLE_NAMES_CONFKEY = "phoenix.mapreduce.import.tablenames";
+
+ /** Configuration key for the table configurations */
+ public static final String TABLE_CONFIG_CONFKEY = "phoenix.mapreduce.import.table.config";
+
+ /**
+ * Parses a single input line, returning a {@code T}.
+ */
+ public interface LineParser<T> {
+ T parse(String input) throws IOException;
+ }
+
+ protected PhoenixConnection conn;
+ protected UpsertExecutor<RECORD, ?> upsertExecutor;
+ protected ImportPreUpsertKeyValueProcessor preUpdateProcessor;
+ protected List<String> tableNames;
+ protected MapperUpsertListener<RECORD> upsertListener;
+
+ protected abstract UpsertExecutor<RECORD,?> buildUpsertExecutor(Configuration conf);
+ protected abstract LineParser<RECORD> getLineParser();
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+
+ Configuration conf = context.getConfiguration();
+
+ // pass client configuration into driver
+ Properties clientInfos = new Properties();
+ for (Map.Entry<String, String> entry : conf) {
+ clientInfos.setProperty(entry.getKey(), entry.getValue());
+ }
+
+ try {
+ conn = (PhoenixConnection) QueryUtil.getConnection(clientInfos, conf);
+ } catch (SQLException | ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+
+ final String tableNamesConf = conf.get(TABLE_NAMES_CONFKEY);
+ tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
+
+ upsertListener = new MapperUpsertListener<RECORD>(
+ context, conf.getBoolean(IGNORE_INVALID_ROW_CONFKEY, true));
+ upsertExecutor = buildUpsertExecutor(conf);
+ preUpdateProcessor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+ try {
+ RECORD record = null;
+ try {
+ record = getLineParser().parse(value.toString());
+ } catch (IOException e) {
+ context.getCounter(COUNTER_GROUP_NAME, "Parser errors").increment(1L);
+ return;
+ }
+
+ if (record == null) {
+ context.getCounter(COUNTER_GROUP_NAME, "Empty records").increment(1L);
+ return;
+ }
+ upsertExecutor.execute(ImmutableList.<RECORD>of(record));
+
+ Iterator<Pair<byte[], List<KeyValue>>> uncommittedDataIterator
+ = PhoenixRuntime.getUncommittedDataIterator(conn, true);
+ while (uncommittedDataIterator.hasNext()) {
+ Pair<byte[], List<KeyValue>> kvPair = uncommittedDataIterator.next();
+ List<KeyValue> keyValueList = kvPair.getSecond();
+ keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), keyValueList);
+ byte[] first = kvPair.getFirst();
+ for (String tableName : tableNames) {
+ if (Bytes.compareTo(Bytes.toBytes(tableName), first) != 0) {
+ // skip edits for other tables
+ continue;
+ }
+ for (KeyValue kv : keyValueList) {
+ ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
+ outputKey.set(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength());
+ context.write(new TableRowkeyPair(tableName, outputKey), kv);
+ }
+ }
+ }
+ conn.rollback();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ try {
+ conn.close();
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Write the list of to-import columns to a job configuration.
+ *
+ * @param conf configuration to be written to
+ * @param columnInfoList list of ColumnInfo objects to be configured for import
+ */
+ @VisibleForTesting
+ static void configureColumnInfoList(Configuration conf, List<ColumnInfo> columnInfoList) {
+ conf.set(COLUMN_INFO_CONFKEY, Joiner.on("|").useForNull("").join(columnInfoList));
+ }
+
+ /**
+ * Build the list of ColumnInfos for the import based on information in the configuration.
+ */
+ @VisibleForTesting
+ static List<ColumnInfo> buildColumnInfoList(Configuration conf) {
+
+ return Lists.newArrayList(
+ Iterables.transform(
+ Splitter.on("|").split(conf.get(COLUMN_INFO_CONFKEY)),
+ new Function<String, ColumnInfo>() {
+ @Nullable
+ @Override
+ public ColumnInfo apply(@Nullable String input) {
+ if (input == null || input.isEmpty()) {
+ // An empty string represents a null that was passed in to
+ // the configuration, which corresponds to an input column
+ // which is to be skipped
+ return null;
+ }
+ return ColumnInfo.fromString(input);
+ }
+ }));
+ }
+
+ /**
+ * Listener that logs successful upserts and errors to job counters.
+ */
+ @VisibleForTesting
+ static class MapperUpsertListener<T> implements UpsertExecutor.UpsertListener<T> {
+
+ private final Mapper<LongWritable, Text, TableRowkeyPair, KeyValue>.Context context;
+ private final boolean ignoreRecordErrors;
+
+ private MapperUpsertListener(
+ Mapper<LongWritable, Text, TableRowkeyPair, KeyValue>.Context context,
+ boolean ignoreRecordErrors) {
+ this.context = context;
+ this.ignoreRecordErrors = ignoreRecordErrors;
+ }
+
+ @Override
+ public void upsertDone(long upsertCount) {
+ context.getCounter(COUNTER_GROUP_NAME, "Upserts Done").increment(1L);
+ }
+
+ @Override
+ public void errorOnRecord(T record, Throwable throwable) {
+ LOG.error("Error on record " + record, throwable);
+ context.getCounter(COUNTER_GROUP_NAME, "Errors on records").increment(1L);
+ if (!ignoreRecordErrors) {
+ throw Throwables.propagate(throwable);
+ }
+ }
+ }
+
+ /**
+ * A default implementation of {@code ImportPreUpsertKeyValueProcessor} that is used if no
+ * specific class is configured. This implementation simply passes through the KeyValue
+ * list that is passed in.
+ */
+ public static class DefaultImportPreUpsertKeyValueProcessor implements
+ ImportPreUpsertKeyValueProcessor {
+
+ @Override
+ public List<KeyValue> preUpsert(byte[] rowKey, List<KeyValue> keyValues) {
+ return keyValues;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
new file mode 100644
index 0000000..5d00656
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
+
+/**
+ * Reducer class for the bulkload jobs.
+ * Performs similar functionality to {@link KeyValueSortReducer}
+ */
+public class FormatToKeyValueReducer
+ extends Reducer<TableRowkeyPair,KeyValue,TableRowkeyPair,KeyValue> {
+
+ @Override
+ protected void reduce(TableRowkeyPair key, Iterable<KeyValue> values,
+ Reducer<TableRowkeyPair, KeyValue, TableRowkeyPair, KeyValue>.Context context)
+ throws IOException, InterruptedException {
+ TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
+ for (KeyValue kv: values) {
+ try {
+ map.add(kv.clone());
+ } catch (CloneNotSupportedException e) {
+ throw new java.io.IOException(e);
+ }
+ }
+ context.setStatus("Read " + map.getClass());
+ int index = 0;
+ for (KeyValue kv: map) {
+ context.write(key, kv);
+ if (++index % 100 == 0) context.setStatus("Wrote " + index);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ImportPreUpsertKeyValueProcessor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ImportPreUpsertKeyValueProcessor.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ImportPreUpsertKeyValueProcessor.java
index 62211f3..dff9ef2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ImportPreUpsertKeyValueProcessor.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/ImportPreUpsertKeyValueProcessor.java
@@ -24,7 +24,8 @@ import java.util.List;
/**
* A listener hook to process KeyValues that are being written to HFiles for bulk import.
* Implementing this interface and configuring it via the {@link
- * CsvToKeyValueMapper#UPSERT_HOOK_CLASS_CONFKEY} configuration key.
+ * org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil#UPSERT_HOOK_CLASS_CONFKEY}
+ * configuration key.
* <p/>
* The intention of such a hook is to allow coproccessor-style operations to be peformed on
* data that is being bulk-loaded via MapReduce.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonBulkLoadTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonBulkLoadTool.java
new file mode 100644
index 0000000..1bea3f0
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonBulkLoadTool.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.util.ColumnInfo;
+
+/**
+ * A tool for running MapReduce-based ingests of JSON data. Nested JSON data structures are not
+ * handled, though lists are converted into typed ARRAYS.
+ */
+public class JsonBulkLoadTool extends AbstractBulkLoadTool {
+
+ @Override
+ protected void configureOptions(CommandLine cmdLine, List<ColumnInfo> importColumns,
+ Configuration conf) throws SQLException {
+ // noop
+ }
+
+ @Override
+ protected void setupJob(Job job) {
+ // Allow overriding the job jar setting by using a -D system property at startup
+ if (job.getJar() == null) {
+ job.setJarByClass(JsonToKeyValueMapper.class);
+ }
+ job.setMapperClass(JsonToKeyValueMapper.class);
+ }
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new JsonBulkLoadTool(), args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/578979a1/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonToKeyValueMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonToKeyValueMapper.java
new file mode 100644
index 0000000..5173a0e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonToKeyValueMapper.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.UpsertExecutor;
+import org.apache.phoenix.util.json.JsonUpsertExecutor;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+/**
+ * MapReduce mapper that converts JSON input lines into KeyValues that can be written to HFiles.
+ * <p/>
+ * KeyValues are produced by executing UPSERT statements on a Phoenix connection and then
+ * extracting the created KeyValues and rolling back the statement execution before it is
+ * committed to HBase.
+ */
+public class JsonToKeyValueMapper extends FormatToKeyValueMapper<Map<?, ?>> {
+
+ private LineParser<Map<?, ?>> lineParser;
+
+ @Override
+ protected LineParser<Map<?, ?>> getLineParser() {
+ return lineParser;
+ }
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+ lineParser = new JsonLineParser();
+ }
+
+ @VisibleForTesting
+ @Override
+ protected UpsertExecutor<Map<?, ?>, ?> buildUpsertExecutor(Configuration conf) {
+ String tableName = conf.get(TABLE_NAME_CONFKEY);
+ Preconditions.checkNotNull(tableName, "table name is not configured");
+
+ List<ColumnInfo> columnInfoList = buildColumnInfoList(conf);
+
+ return new JsonUpsertExecutor(conn, tableName, columnInfoList, upsertListener);
+ }
+
+ @VisibleForTesting
+ static class JsonLineParser implements LineParser<Map<?, ?>> {
+ private final ObjectMapper mapper = new ObjectMapper();
+
+ @Override
+ public Map<?, ?> parse(String input) throws IOException {
+ return mapper.readValue(input, Map.class);
+ }
+ }
+}