You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2016/03/29 21:50:43 UTC
[10/30] phoenix git commit: PHOENIX-2733 Minor cleanup for
improvements to CSV Bulk Loader performance (Sergey Soldatov)
PHOENIX-2733 Minor cleanup for improvements to CSV Bulk Loader performance (Sergey Soldatov)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/952a01cc
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/952a01cc
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/952a01cc
Branch: refs/heads/calcite
Commit: 952a01cce6d7d314ca26e89a398e75f38690cdad
Parents: 1d03e02
Author: James Taylor <jt...@salesforce.com>
Authored: Wed Mar 2 13:54:05 2016 -0800
Committer: James Taylor <jt...@salesforce.com>
Committed: Wed Mar 2 13:54:24 2016 -0800
----------------------------------------------------------------------
.../phoenix/mapreduce/AbstractBulkLoadTool.java | 12 +-
.../phoenix/mapreduce/CsvToKeyValueMapper.java | 2 +-
.../mapreduce/FormatToBytesWritableMapper.java | 385 +++++++++++++++++++
.../mapreduce/FormatToKeyValueMapper.java | 378 ------------------
.../mapreduce/FormatToKeyValueReducer.java | 28 +-
.../phoenix/mapreduce/JsonToKeyValueMapper.java | 2 +-
.../bulkload/TargetTableRefFunctions.java | 2 +-
.../util/PhoenixConfigurationUtil.java | 4 +-
.../FormatToBytesWritableMapperTest.java | 102 +++++
.../mapreduce/FormatToKeyValueMapperTest.java | 102 -----
10 files changed, 516 insertions(+), 501 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/952a01cc/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
index ab2848f..ff73530 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
@@ -209,10 +209,10 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
List<ColumnInfo> importColumns = buildImportColumns(conn, cmdLine, qualifiedTableName);
Preconditions.checkNotNull(importColumns);
Preconditions.checkArgument(!importColumns.isEmpty(), "Column info list is empty");
- FormatToKeyValueMapper.configureColumnInfoList(conf, importColumns);
+ FormatToBytesWritableMapper.configureColumnInfoList(conf, importColumns);
boolean ignoreInvalidRows = cmdLine.hasOption(IGNORE_ERRORS_OPT.getOpt());
- conf.setBoolean(FormatToKeyValueMapper.IGNORE_INVALID_ROW_CONFKEY, ignoreInvalidRows);
- conf.set(FormatToKeyValueMapper.TABLE_NAME_CONFKEY, qualifiedTableName);
+ conf.setBoolean(FormatToBytesWritableMapper.IGNORE_INVALID_ROW_CONFKEY, ignoreInvalidRows);
+ conf.set(FormatToBytesWritableMapper.TABLE_NAME_CONFKEY, qualifiedTableName);
// give subclasses their hook
configureOptions(cmdLine, importColumns, conf);
@@ -277,10 +277,10 @@ public abstract class AbstractBulkLoadTool extends Configured implements Tool {
MultiHfileOutputFormat.configureIncrementalLoad(job, tablesToBeLoaded);
final String tableNamesAsJson = TargetTableRefFunctions.NAMES_TO_JSON.apply(tablesToBeLoaded);
- final String logicalNamesAsJson = TargetTableRefFunctions.LOGICAN_NAMES_TO_JSON.apply(tablesToBeLoaded);
+ final String logicalNamesAsJson = TargetTableRefFunctions.LOGICAL_NAMES_TO_JSON.apply(tablesToBeLoaded);
- job.getConfiguration().set(FormatToKeyValueMapper.TABLE_NAMES_CONFKEY,tableNamesAsJson);
- job.getConfiguration().set(FormatToKeyValueMapper.LOGICAL_NAMES_CONFKEY,logicalNamesAsJson);
+ job.getConfiguration().set(FormatToBytesWritableMapper.TABLE_NAMES_CONFKEY,tableNamesAsJson);
+ job.getConfiguration().set(FormatToBytesWritableMapper.LOGICAL_NAMES_CONFKEY,logicalNamesAsJson);
// give subclasses their hook
setupJob(job);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/952a01cc/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
index 2cb1ac7..e5a6e5b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
@@ -41,7 +41,7 @@ import com.google.common.collect.Iterables;
* extracting the created KeyValues and rolling back the statement execution before it is
* committed to HBase.
*/
-public class CsvToKeyValueMapper extends FormatToKeyValueMapper<CSVRecord> {
+public class CsvToKeyValueMapper extends FormatToBytesWritableMapper<CSVRecord> {
/** Configuration key for the field delimiter for input csv records */
public static final String FIELD_DELIMITER_CONFKEY = "phoenix.mapreduce.import.fielddelimiter";
http://git-wip-us.apache.org/repos/asf/phoenix/blob/952a01cc/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
new file mode 100644
index 0000000..2c9b6d9
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapper.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TreeMap;
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
+import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.UpsertExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+/**
+ * Base class for converting some input source format into {@link ImmutableBytesWritable}s that
+ * contains packed in a single byte array values for all columns.
+ * Assumes input format is text-based, with one row per line. Depends on an online cluster
+ * to retrieve {@link ColumnInfo} from the target table.
+ */
+public abstract class FormatToBytesWritableMapper<RECORD> extends Mapper<LongWritable, Text, TableRowkeyPair,
+ ImmutableBytesWritable> {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(FormatToBytesWritableMapper.class);
+
+ protected static final String COUNTER_GROUP_NAME = "Phoenix MapReduce Import";
+
+ /** Configuration key for the name of the output table */
+ public static final String TABLE_NAME_CONFKEY = "phoenix.mapreduce.import.tablename";
+
+ /** Configuration key for the columns to be imported */
+ public static final String COLUMN_INFO_CONFKEY = "phoenix.mapreduce.import.columninfos";
+
+ /** Configuration key for the flag to ignore invalid rows */
+ public static final String IGNORE_INVALID_ROW_CONFKEY = "phoenix.mapreduce.import.ignoreinvalidrow";
+
+ /** Configuration key for the table names */
+ public static final String TABLE_NAMES_CONFKEY = "phoenix.mapreduce.import.tablenames";
+
+ /** Configuration key for the table logical names */
+ public static final String LOGICAL_NAMES_CONFKEY = "phoenix.mapreduce.import.logicalnames";
+
+ /**
+ * Parses a single input line, returning a {@code T}.
+ */
+ public interface LineParser<T> {
+ T parse(String input) throws IOException;
+ }
+
+ protected PhoenixConnection conn;
+ protected UpsertExecutor<RECORD, ?> upsertExecutor;
+ protected ImportPreUpsertKeyValueProcessor preUpdateProcessor;
+ protected List<String> tableNames;
+ protected List<String> logicalNames;
+ protected MapperUpsertListener<RECORD> upsertListener;
+
+ /*
+ lookup table for column index. Index in the List matches to the index in tableNames List
+ */
+ protected List<Map<byte[], Map<byte[], Integer>>> columnIndexes;
+
+ protected abstract UpsertExecutor<RECORD,?> buildUpsertExecutor(Configuration conf);
+ protected abstract LineParser<RECORD> getLineParser();
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+
+ Configuration conf = context.getConfiguration();
+
+ // pass client configuration into driver
+ Properties clientInfos = new Properties();
+ for (Map.Entry<String, String> entry : conf) {
+ clientInfos.setProperty(entry.getKey(), entry.getValue());
+ }
+
+ try {
+ conn = (PhoenixConnection) QueryUtil.getConnection(clientInfos, conf);
+
+ final String tableNamesConf = conf.get(TABLE_NAMES_CONFKEY);
+ final String logicalNamesConf = conf.get(LOGICAL_NAMES_CONFKEY);
+ tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
+ logicalNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf);
+
+ columnIndexes = initColumnIndexes();
+ } catch (SQLException | ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+
+ upsertListener = new MapperUpsertListener<RECORD>(
+ context, conf.getBoolean(IGNORE_INVALID_ROW_CONFKEY, true));
+ upsertExecutor = buildUpsertExecutor(conf);
+ preUpdateProcessor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ protected void map(LongWritable key, Text value, Context context) throws IOException,
+ InterruptedException {
+ if (conn == null) {
+ throw new RuntimeException("Connection not initialized.");
+ }
+ try {
+ RECORD record = null;
+ try {
+ record = getLineParser().parse(value.toString());
+ } catch (IOException e) {
+ context.getCounter(COUNTER_GROUP_NAME, "Parser errors").increment(1L);
+ return;
+ }
+
+ if (record == null) {
+ context.getCounter(COUNTER_GROUP_NAME, "Empty records").increment(1L);
+ return;
+ }
+ upsertExecutor.execute(ImmutableList.<RECORD>of(record));
+ Map<Integer, List<KeyValue>> map = new HashMap<>();
+ Iterator<Pair<byte[], List<KeyValue>>> uncommittedDataIterator
+ = PhoenixRuntime.getUncommittedDataIterator(conn, true);
+ while (uncommittedDataIterator.hasNext()) {
+ Pair<byte[], List<KeyValue>> kvPair = uncommittedDataIterator.next();
+ List<KeyValue> keyValueList = kvPair.getSecond();
+ keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), keyValueList);
+ byte[] first = kvPair.getFirst();
+ // Create a list of KV for each table
+ for (int i = 0; i < tableNames.size(); i++) {
+ if (Bytes.compareTo(Bytes.toBytes(tableNames.get(i)), first) == 0) {
+ if (!map.containsKey(i)) {
+ map.put(i, new ArrayList<KeyValue>());
+ }
+ List<KeyValue> list = map.get(i);
+ for (KeyValue kv : keyValueList) {
+ list.add(kv);
+ }
+ break;
+ }
+ }
+ }
+ for (Map.Entry<Integer, List<KeyValue>> rowEntry : map.entrySet()) {
+ int tableIndex = rowEntry.getKey();
+ List<KeyValue> lkv = rowEntry.getValue();
+ // All KV values combines to a single byte array
+ writeAggregatedRow(context, tableIndex, lkv);
+ }
+ conn.rollback();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private List<Map<byte[], Map<byte[], Integer>>> initColumnIndexes() throws SQLException {
+ List<Map<byte[], Map<byte[], Integer>>> tableMap = new ArrayList<>();
+ int tableIndex;
+ for (tableIndex = 0; tableIndex < tableNames.size(); tableIndex++) {
+ PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(tableIndex));
+ Map<byte[], Map<byte[], Integer>> columnMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ List<PColumn> cls = table.getColumns();
+ for (int i = 0; i < cls.size(); i++) {
+ PColumn c = cls.get(i);
+ if (c.getFamilyName() == null) continue; // Skip PK column
+ byte[] family = c.getFamilyName().getBytes();
+ byte[] name = c.getName().getBytes();
+ if (!columnMap.containsKey(family)) {
+ columnMap.put(family, new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR));
+ }
+ Map<byte[], Integer> qualifier = columnMap.get(family);
+ qualifier.put(name, i);
+ }
+ tableMap.add(columnMap);
+ }
+ return tableMap;
+ }
+
+ /**
+ * Find the column index which will replace the column name in
+ * the aggregated array and will be restored in Reducer
+ *
+ * @param tableIndex Table index in tableNames list
+ * @param cell KeyValue for the column
+ * @return column index for the specified cell or -1 if was not found
+ */
+ private int findIndex(int tableIndex, Cell cell) {
+ Map<byte[], Map<byte[], Integer>> columnMap = columnIndexes.get(tableIndex);
+ Map<byte[], Integer> qualifiers = columnMap.get(Bytes.copy(cell.getFamilyArray(),
+ cell.getFamilyOffset(), cell.getFamilyLength()));
+ if (qualifiers != null) {
+ Integer result = qualifiers.get(Bytes.copy(cell.getQualifierArray(),
+ cell.getQualifierOffset(), cell.getQualifierLength()));
+ if (result != null) {
+ return result;
+ }
+ }
+ return -1;
+ }
+
+ /**
+ * Collect all column values for the same rowKey
+ *
+ * @param context Current mapper context
+ * @param tableIndex Table index in tableNames list
+ * @param lkv List of KV values that will be combined in a single ImmutableBytesWritable
+ * @throws IOException
+ * @throws InterruptedException
+ */
+
+ private void writeAggregatedRow(Context context, int tableIndex, List<KeyValue> lkv)
+ throws IOException, InterruptedException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream(1024);
+ DataOutputStream outputStream = new DataOutputStream(bos);
+ ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
+ if (!lkv.isEmpty()) {
+ // All Key Values for the same row are supposed to be the same, so init rowKey only once
+ Cell first = lkv.get(0);
+ outputKey.set(first.getRowArray(), first.getRowOffset(), first.getRowLength());
+ for (KeyValue cell : lkv) {
+ if (isEmptyCell(cell)) {
+ continue;
+ }
+ int i = findIndex(tableIndex, cell);
+ if (i == -1) {
+ throw new IOException("No column found for KeyValue");
+ }
+ WritableUtils.writeVInt(outputStream, i);
+ outputStream.writeByte(cell.getTypeByte());
+ WritableUtils.writeVInt(outputStream, cell.getValueLength());
+ outputStream.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+ }
+ }
+ ImmutableBytesWritable aggregatedArray = new ImmutableBytesWritable(bos.toByteArray());
+ outputStream.close();
+ context.write(new TableRowkeyPair(tableNames.get(tableIndex), outputKey), aggregatedArray);
+ }
+
+ protected boolean isEmptyCell(KeyValue cell) {
+ if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(),
+ cell.getQualifierLength(), QueryConstants.EMPTY_COLUMN_BYTES, 0,
+ QueryConstants.EMPTY_COLUMN_BYTES.length) != 0)
+ return false;
+ else
+ return true;
+ }
+
+
+ @Override
+ protected void cleanup(Context context) throws IOException, InterruptedException {
+ try {
+ if (conn != null) {
+ conn.close();
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Write the list of to-import columns to a job configuration.
+ *
+ * @param conf configuration to be written to
+ * @param columnInfoList list of ColumnInfo objects to be configured for import
+ */
+ @VisibleForTesting
+ static void configureColumnInfoList(Configuration conf, List<ColumnInfo> columnInfoList) {
+ conf.set(COLUMN_INFO_CONFKEY, Joiner.on("|").useForNull("").join(columnInfoList));
+ }
+
+ /**
+ * Build the list of ColumnInfos for the import based on information in the configuration.
+ */
+ @VisibleForTesting
+ static List<ColumnInfo> buildColumnInfoList(Configuration conf) {
+
+ return Lists.newArrayList(
+ Iterables.transform(
+ Splitter.on("|").split(conf.get(COLUMN_INFO_CONFKEY)),
+ new Function<String, ColumnInfo>() {
+ @Nullable
+ @Override
+ public ColumnInfo apply(@Nullable String input) {
+ if (input == null || input.isEmpty()) {
+ // An empty string represents a null that was passed in to
+ // the configuration, which corresponds to an input column
+ // which is to be skipped
+ return null;
+ }
+ return ColumnInfo.fromString(input);
+ }
+ }));
+ }
+
+ /**
+ * Listener that logs successful upserts and errors to job counters.
+ */
+ @VisibleForTesting
+ static class MapperUpsertListener<T> implements UpsertExecutor.UpsertListener<T> {
+
+ private final Mapper<LongWritable, Text,
+ TableRowkeyPair, ImmutableBytesWritable>.Context context;
+ private final boolean ignoreRecordErrors;
+
+ private MapperUpsertListener(
+ Mapper<LongWritable, Text, TableRowkeyPair, ImmutableBytesWritable>.Context context,
+ boolean ignoreRecordErrors) {
+ this.context = context;
+ this.ignoreRecordErrors = ignoreRecordErrors;
+ }
+
+ @Override
+ public void upsertDone(long upsertCount) {
+ context.getCounter(COUNTER_GROUP_NAME, "Upserts Done").increment(1L);
+ }
+
+ @Override
+ public void errorOnRecord(T record, Throwable throwable) {
+ LOG.error("Error on record " + record, throwable);
+ context.getCounter(COUNTER_GROUP_NAME, "Errors on records").increment(1L);
+ if (!ignoreRecordErrors) {
+ throw Throwables.propagate(throwable);
+ }
+ }
+ }
+
+ /**
+ * A default implementation of {@code ImportPreUpsertKeyValueProcessor} that is used if no
+ * specific class is configured. This implementation simply passes through the KeyValue
+ * list that is passed in.
+ */
+ public static class DefaultImportPreUpsertKeyValueProcessor implements
+ ImportPreUpsertKeyValueProcessor {
+
+ @Override
+ public List<KeyValue> preUpsert(byte[] rowKey, List<KeyValue> keyValues) {
+ return keyValues;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/952a01cc/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java
deleted file mode 100644
index 8dbb4aa..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java
+++ /dev/null
@@ -1,378 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.mapreduce;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.sql.SQLException;
-import java.util.*;
-import javax.annotation.Nullable;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
-import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
-import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
-import org.apache.phoenix.query.QueryConstants;
-import org.apache.phoenix.schema.PColumn;
-import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.util.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-/**
- * Base class for converting some input source format into {@link KeyValue}s of a target
- * schema. Assumes input format is text-based, with one row per line. Depends on an online cluster
- * to retrieve {@link ColumnInfo} from the target table.
- */
-public abstract class FormatToKeyValueMapper<RECORD> extends Mapper<LongWritable, Text, TableRowkeyPair,
- ImmutableBytesWritable> {
-
- protected static final Logger LOG = LoggerFactory.getLogger(FormatToKeyValueMapper.class);
-
- protected static final String COUNTER_GROUP_NAME = "Phoenix MapReduce Import";
-
- /** Configuration key for the name of the output table */
- public static final String TABLE_NAME_CONFKEY = "phoenix.mapreduce.import.tablename";
-
- /** Configuration key for the name of the output index table */
- public static final String INDEX_TABLE_NAME_CONFKEY = "phoenix.mapreduce.import.indextablename";
-
- /** Configuration key for the columns to be imported */
- public static final String COLUMN_INFO_CONFKEY = "phoenix.mapreduce.import.columninfos";
-
- /** Configuration key for the flag to ignore invalid rows */
- public static final String IGNORE_INVALID_ROW_CONFKEY = "phoenix.mapreduce.import.ignoreinvalidrow";
-
- /** Configuration key for the table names */
- public static final String TABLE_NAMES_CONFKEY = "phoenix.mapreduce.import.tablenames";
- public static final String LOGICAL_NAMES_CONFKEY = "phoenix.mapreduce.import.logicalnames";
-
- /** Configuration key for the table configurations */
- public static final String TABLE_CONFIG_CONFKEY = "phoenix.mapreduce.import.table.config";
-
- /**
- * Parses a single input line, returning a {@code T}.
- */
- public interface LineParser<T> {
- T parse(String input) throws IOException;
- }
-
- protected PhoenixConnection conn;
- protected UpsertExecutor<RECORD, ?> upsertExecutor;
- protected ImportPreUpsertKeyValueProcessor preUpdateProcessor;
- protected List<String> tableNames;
- protected List<String> logicalNames;
- protected MapperUpsertListener<RECORD> upsertListener;
-
- /*
- lookup table for column index. Index in the List matches to the index in tableNames List
- */
- protected List<Map<byte[], Map<byte[], Integer>>> columnIndexes;
-
- protected abstract UpsertExecutor<RECORD,?> buildUpsertExecutor(Configuration conf);
- protected abstract LineParser<RECORD> getLineParser();
-
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
-
- Configuration conf = context.getConfiguration();
-
- // pass client configuration into driver
- Properties clientInfos = new Properties();
- for (Map.Entry<String, String> entry : conf) {
- clientInfos.setProperty(entry.getKey(), entry.getValue());
- }
-
- try {
- conn = (PhoenixConnection) QueryUtil.getConnection(clientInfos, conf);
-
- final String tableNamesConf = conf.get(TABLE_NAMES_CONFKEY);
- final String logicalNamesConf = conf.get(LOGICAL_NAMES_CONFKEY);
- tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
- logicalNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf);
-
- columnIndexes = initColumnIndexes();
- } catch (SQLException | ClassNotFoundException e) {
- throw new RuntimeException(e);
- }
-
- upsertListener = new MapperUpsertListener<RECORD>(
- context, conf.getBoolean(IGNORE_INVALID_ROW_CONFKEY, true));
- upsertExecutor = buildUpsertExecutor(conf);
- preUpdateProcessor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
- }
-
- @SuppressWarnings("deprecation")
- @Override
- protected void map(LongWritable key, Text value, Context context) throws IOException,
- InterruptedException {
- if (conn == null) {
- throw new RuntimeException("Connection not initialized.");
- }
- try {
- RECORD record = null;
- try {
- record = getLineParser().parse(value.toString());
- } catch (IOException e) {
- context.getCounter(COUNTER_GROUP_NAME, "Parser errors").increment(1L);
- return;
- }
-
- if (record == null) {
- context.getCounter(COUNTER_GROUP_NAME, "Empty records").increment(1L);
- return;
- }
- upsertExecutor.execute(ImmutableList.<RECORD>of(record));
- Map<Integer, List<KeyValue>> map = new HashMap<>();
- Iterator<Pair<byte[], List<KeyValue>>> uncommittedDataIterator
- = PhoenixRuntime.getUncommittedDataIterator(conn, true);
- while (uncommittedDataIterator.hasNext()) {
- Pair<byte[], List<KeyValue>> kvPair = uncommittedDataIterator.next();
- List<KeyValue> keyValueList = kvPair.getSecond();
- keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), keyValueList);
- byte[] first = kvPair.getFirst();
- // Create a list of KV for each table
- for (int i = 0; i < tableNames.size(); i++) {
- if (Bytes.compareTo(Bytes.toBytes(tableNames.get(i)), first) == 0) {
- if (!map.containsKey(i)) {
- map.put(i, new ArrayList<KeyValue>());
- }
- List<KeyValue> list = map.get(i);
- for (KeyValue kv : keyValueList) {
- list.add(kv);
- }
- break;
- }
- }
- }
- for(Map.Entry<Integer, List<KeyValue>> rowEntry : map.entrySet()) {
- int tableIndex = rowEntry.getKey();
- List<KeyValue> lkv = rowEntry.getValue();
- // All KV values combines to a single byte array
- writeAggregatedRow(context, tableIndex, lkv);
- }
- conn.rollback();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- private List<Map<byte[],Map<byte[], Integer>>> initColumnIndexes() throws SQLException {
- List<Map<byte[], Map<byte[], Integer>>> tableMap = new ArrayList<>();
- int tableIndex;
- for (tableIndex = 0; tableIndex < tableNames.size(); tableIndex++) {
- PTable table = PhoenixRuntime.getTable(conn, logicalNames.get(tableIndex));
- Map<byte[], Map<byte[], Integer>> columnMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
- List<PColumn> cls = table.getColumns();
- for(int i = 0; i < cls.size(); i++) {
- PColumn c = cls.get(i);
- if(c.getFamilyName() == null) continue; // Skip PK column
- byte[] family = c.getFamilyName().getBytes();
- byte[] name = c.getName().getBytes();
- if(!columnMap.containsKey(family)) {
- columnMap.put(family, new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR));
- }
- Map<byte[], Integer> qualifier = columnMap.get(family);
- qualifier.put(name, i);
- }
- tableMap.add(columnMap);
- }
- return tableMap;
- }
-
- /**
- * Find the column index which will replace the column name in
- * the aggregated array and will be restored in Reducer
- *
- * @param tableIndex Table index in tableNames list
- * @param cell KeyValue for the column
- * @return column index for the specified cell or -1 if was not found
- */
- private int findIndex(int tableIndex, Cell cell) {
- Map<byte[], Map<byte[], Integer>> columnMap = columnIndexes.get(tableIndex);
- Map<byte[], Integer> qualifiers = columnMap.get(Bytes.copy(cell.getFamilyArray(),
- cell.getFamilyOffset(), cell.getFamilyLength()));
- if(qualifiers!= null) {
- Integer result = qualifiers.get(Bytes.copy(cell.getQualifierArray(),
- cell.getQualifierOffset(), cell.getQualifierLength()));
- if(result!=null) {
- return result;
- }
- }
- return -1;
- }
-
- /**
- * Collect all column values for the same rowKey
- *
- * @param context Current mapper context
- * @param tableIndex Table index in tableNames list
- * @param lkv List of KV values that will be combined in a single ImmutableBytesWritable
- * @throws IOException
- * @throws InterruptedException
- */
-
- private void writeAggregatedRow(Context context, int tableIndex, List<KeyValue> lkv)
- throws IOException, InterruptedException {
- TrustedByteArrayOutputStream bos = new TrustedByteArrayOutputStream(1024);
- DataOutputStream outputStream = new DataOutputStream(bos);
- ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
- if (!lkv.isEmpty()) {
- // All Key Values for the same row are supposed to be the same, so init rowKey only once
- Cell first = lkv.get(0);
- outputKey.set(first.getRowArray(), first.getRowOffset(), first.getRowLength());
- for (KeyValue cell : lkv) {
- if(isEmptyCell(cell)) {
- continue;
- }
- int i = findIndex(tableIndex, cell);
- if (i == -1) {
- throw new IOException("No column found for KeyValue");
- }
- WritableUtils.writeVInt(outputStream, i);
- outputStream.writeByte(cell.getTypeByte());
- WritableUtils.writeVInt(outputStream, cell.getValueLength());
- outputStream.write(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
- }
- }
- ImmutableBytesWritable aggregatedArray = new ImmutableBytesWritable(bos.toByteArray());
- outputStream.close();
- context.write(new TableRowkeyPair(tableNames.get(tableIndex), outputKey), aggregatedArray);
- }
-
- protected boolean isEmptyCell(KeyValue cell) {
- if (Bytes.compareTo(cell.getQualifierArray(), cell.getQualifierOffset(),
- cell.getQualifierLength(), QueryConstants.EMPTY_COLUMN_BYTES, 0,
- QueryConstants.EMPTY_COLUMN_BYTES.length) != 0)
- return false;
- else
- return true;
- }
-
-
- @Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- try {
- if (conn != null) {
- conn.close();
- }
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Write the list of to-import columns to a job configuration.
- *
- * @param conf configuration to be written to
- * @param columnInfoList list of ColumnInfo objects to be configured for import
- */
- @VisibleForTesting
- static void configureColumnInfoList(Configuration conf, List<ColumnInfo> columnInfoList) {
- conf.set(COLUMN_INFO_CONFKEY, Joiner.on("|").useForNull("").join(columnInfoList));
- }
-
- /**
- * Build the list of ColumnInfos for the import based on information in the configuration.
- */
- @VisibleForTesting
- static List<ColumnInfo> buildColumnInfoList(Configuration conf) {
-
- return Lists.newArrayList(
- Iterables.transform(
- Splitter.on("|").split(conf.get(COLUMN_INFO_CONFKEY)),
- new Function<String, ColumnInfo>() {
- @Nullable
- @Override
- public ColumnInfo apply(@Nullable String input) {
- if (input == null || input.isEmpty()) {
- // An empty string represents a null that was passed in to
- // the configuration, which corresponds to an input column
- // which is to be skipped
- return null;
- }
- return ColumnInfo.fromString(input);
- }
- }));
- }
-
- /**
- * Listener that logs successful upserts and errors to job counters.
- */
- @VisibleForTesting
- static class MapperUpsertListener<T> implements UpsertExecutor.UpsertListener<T> {
-
- private final Mapper<LongWritable, Text,
- TableRowkeyPair, ImmutableBytesWritable>.Context context;
- private final boolean ignoreRecordErrors;
-
- private MapperUpsertListener(
- Mapper<LongWritable, Text, TableRowkeyPair, ImmutableBytesWritable>.Context context,
- boolean ignoreRecordErrors) {
- this.context = context;
- this.ignoreRecordErrors = ignoreRecordErrors;
- }
-
- @Override
- public void upsertDone(long upsertCount) {
- context.getCounter(COUNTER_GROUP_NAME, "Upserts Done").increment(1L);
- }
-
- @Override
- public void errorOnRecord(T record, Throwable throwable) {
- LOG.error("Error on record " + record, throwable);
- context.getCounter(COUNTER_GROUP_NAME, "Errors on records").increment(1L);
- if (!ignoreRecordErrors) {
- throw Throwables.propagate(throwable);
- }
- }
- }
-
- /**
- * A default implementation of {@code ImportPreUpsertKeyValueProcessor} that is used if no
- * specific class is configured. This implementation simply passes through the KeyValue
- * list that is passed in.
- */
- public static class DefaultImportPreUpsertKeyValueProcessor implements
- ImportPreUpsertKeyValueProcessor {
-
- @Override
- public List<KeyValue> preUpsert(byte[] rowKey, List<KeyValue> keyValues) {
- return keyValues;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/952a01cc/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
index fb61855..e906431 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
@@ -21,7 +21,11 @@ import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.sql.SQLException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
@@ -38,7 +42,11 @@ import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PTable;
-import org.apache.phoenix.util.*;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.Closeables;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,7 +55,7 @@ import org.slf4j.LoggerFactory;
* Performs similar functionality to {@link KeyValueSortReducer}
*/
public class FormatToKeyValueReducer
- extends Reducer<TableRowkeyPair,ImmutableBytesWritable,TableRowkeyPair,KeyValue> {
+ extends Reducer<TableRowkeyPair, ImmutableBytesWritable, TableRowkeyPair, KeyValue> {
protected static final Logger LOG = LoggerFactory.getLogger(FormatToKeyValueReducer.class);
@@ -72,8 +80,8 @@ public class FormatToKeyValueReducer
try {
PhoenixConnection conn = (PhoenixConnection) QueryUtil.getConnection(clientInfos, conf);
builder = conn.getKeyValueBuilder();
- final String tableNamesConf = conf.get(FormatToKeyValueMapper.TABLE_NAMES_CONFKEY);
- final String logicalNamesConf = conf.get(FormatToKeyValueMapper.LOGICAL_NAMES_CONFKEY);
+ final String tableNamesConf = conf.get(FormatToBytesWritableMapper.TABLE_NAMES_CONFKEY);
+ final String logicalNamesConf = conf.get(FormatToBytesWritableMapper.LOGICAL_NAMES_CONFKEY);
tableNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
logicalNames = TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf);
@@ -91,9 +99,9 @@ public class FormatToKeyValueReducer
emptyFamilyName.add(SchemaUtil.getEmptyColumnFamilyPtr(table));
List<PColumn> cls = table.getColumns();
List<Pair<byte[], byte[]>> list = new ArrayList(cls.size());
- for(int i = 0; i < cls.size(); i++) {
+ for (int i = 0; i < cls.size(); i++) {
PColumn c = cls.get(i);
- if(c.getFamilyName() == null) {
+ if (c.getFamilyName() == null) {
list.add(null); // Skip PK column
continue;
}
@@ -108,14 +116,14 @@ public class FormatToKeyValueReducer
@Override
protected void reduce(TableRowkeyPair key, Iterable<ImmutableBytesWritable> values,
- Reducer<TableRowkeyPair, ImmutableBytesWritable, TableRowkeyPair, KeyValue>.Context context)
- throws IOException, InterruptedException {
+ Reducer<TableRowkeyPair, ImmutableBytesWritable, TableRowkeyPair, KeyValue>.Context context)
+ throws IOException, InterruptedException {
TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
int tableIndex = tableNames.indexOf(key.getTableName());
List<Pair<byte[], byte[]>> columns = columnIndexes.get(tableIndex);
for (ImmutableBytesWritable aggregatedArray : values) {
DataInputStream input = new DataInputStream(new ByteArrayInputStream(aggregatedArray.get()));
- while (input.available()!= 0) {
+ while (input.available() != 0) {
int index = WritableUtils.readVInt(input);
Pair<byte[], byte[]> pair = columns.get(index);
byte type = input.readByte();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/952a01cc/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonToKeyValueMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonToKeyValueMapper.java
index 5173a0e..2a51b2c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonToKeyValueMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/JsonToKeyValueMapper.java
@@ -37,7 +37,7 @@ import com.google.common.base.Preconditions;
* extracting the created KeyValues and rolling back the statement execution before it is
* committed to HBase.
*/
-public class JsonToKeyValueMapper extends FormatToKeyValueMapper<Map<?, ?>> {
+public class JsonToKeyValueMapper extends FormatToBytesWritableMapper<Map<?, ?>> {
private LineParser<Map<?, ?>> lineParser;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/952a01cc/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java
index e02065f..58725c2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java
@@ -78,7 +78,7 @@ public class TargetTableRefFunctions {
}
};
- public static Function<List<TargetTableRef>,String> LOGICAN_NAMES_TO_JSON = new Function<List<TargetTableRef>,String>() {
+ public static Function<List<TargetTableRef>,String> LOGICAL_NAMES_TO_JSON = new Function<List<TargetTableRef>,String>() {
@Override
public String apply(List<TargetTableRef> input) {
http://git-wip-us.apache.org/repos/asf/phoenix/blob/952a01cc/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 280daa2..b1879d1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -36,7 +36,7 @@ import org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.mapreduce.FormatToKeyValueMapper;
+import org.apache.phoenix.mapreduce.FormatToBytesWritableMapper;
import org.apache.phoenix.mapreduce.ImportPreUpsertKeyValueProcessor;
import org.apache.phoenix.mapreduce.PhoenixInputFormat;
import org.apache.phoenix.util.ColumnInfo;
@@ -418,7 +418,7 @@ public final class PhoenixConfigurationUtil {
Class<? extends ImportPreUpsertKeyValueProcessor> processorClass = null;
try {
processorClass = conf.getClass(
- UPSERT_HOOK_CLASS_CONFKEY, FormatToKeyValueMapper.DefaultImportPreUpsertKeyValueProcessor.class,
+ UPSERT_HOOK_CLASS_CONFKEY, FormatToBytesWritableMapper.DefaultImportPreUpsertKeyValueProcessor.class,
ImportPreUpsertKeyValueProcessor.class);
} catch (Exception e) {
throw new IllegalStateException("Couldn't load upsert hook class", e);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/952a01cc/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapperTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapperTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapperTest.java
new file mode 100644
index 0000000..6424976
--- /dev/null
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/FormatToBytesWritableMapperTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.schema.types.PInteger;
+import org.apache.phoenix.schema.types.PIntegerArray;
+import org.apache.phoenix.schema.types.PUnsignedInt;
+import org.apache.phoenix.util.ColumnInfo;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+import static org.junit.Assert.assertEquals;
+
+public class FormatToBytesWritableMapperTest {
+
+ @Test
+ public void testBuildColumnInfoList() {
+ List<ColumnInfo> columnInfoList = ImmutableList.of(
+ new ColumnInfo("idCol", PInteger.INSTANCE.getSqlType()),
+ new ColumnInfo("unsignedIntCol", PUnsignedInt.INSTANCE.getSqlType()),
+ new ColumnInfo("stringArrayCol", PIntegerArray.INSTANCE.getSqlType()));
+
+ Configuration conf = new Configuration();
+ FormatToBytesWritableMapper.configureColumnInfoList(conf, columnInfoList);
+ List<ColumnInfo> fromConfig = FormatToBytesWritableMapper.buildColumnInfoList(conf);
+
+ assertEquals(columnInfoList, fromConfig);
+ }
+
+ @Test
+ public void testBuildColumnInfoList_ContainingNulls() {
+ // A null value in the column info list means "skip that column in the input"
+ List<ColumnInfo> columnInfoListWithNull = Lists.newArrayList(
+ new ColumnInfo("idCol", PInteger.INSTANCE.getSqlType()),
+ null,
+ new ColumnInfo("unsignedIntCol", PUnsignedInt.INSTANCE.getSqlType()),
+ new ColumnInfo("stringArrayCol", PIntegerArray.INSTANCE.getSqlType()));
+
+ Configuration conf = new Configuration();
+ FormatToBytesWritableMapper.configureColumnInfoList(conf, columnInfoListWithNull);
+ List<ColumnInfo> fromConfig = FormatToBytesWritableMapper.buildColumnInfoList(conf);
+
+ assertEquals(columnInfoListWithNull, fromConfig);
+ }
+
+ @Test
+ public void testLoadPreUpdateProcessor() {
+ Configuration conf = new Configuration();
+ conf.setClass(PhoenixConfigurationUtil.UPSERT_HOOK_CLASS_CONFKEY, MockUpsertProcessor.class,
+ ImportPreUpsertKeyValueProcessor.class);
+
+ ImportPreUpsertKeyValueProcessor processor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
+ assertEquals(MockUpsertProcessor.class, processor.getClass());
+ }
+
+ @Test
+ public void testLoadPreUpdateProcessor_NotConfigured() {
+
+ Configuration conf = new Configuration();
+ ImportPreUpsertKeyValueProcessor processor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
+
+ assertEquals(FormatToBytesWritableMapper.DefaultImportPreUpsertKeyValueProcessor.class,
+ processor.getClass());
+ }
+
+ @Test(expected=IllegalStateException.class)
+ public void testLoadPreUpdateProcessor_ClassNotFound() {
+ Configuration conf = new Configuration();
+ conf.set(PhoenixConfigurationUtil.UPSERT_HOOK_CLASS_CONFKEY, "MyUndefinedClass");
+
+ PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
+ }
+
+ static class MockUpsertProcessor implements ImportPreUpsertKeyValueProcessor {
+ @Override
+ public List<KeyValue> preUpsert(byte[] rowKey, List<KeyValue> keyValues) {
+ throw new UnsupportedOperationException("Not yet implemented");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/952a01cc/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapperTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapperTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapperTest.java
deleted file mode 100644
index 3455616..0000000
--- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapperTest.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.mapreduce;
-
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
-import org.apache.phoenix.schema.types.PInteger;
-import org.apache.phoenix.schema.types.PIntegerArray;
-import org.apache.phoenix.schema.types.PUnsignedInt;
-import org.apache.phoenix.util.ColumnInfo;
-import org.junit.Test;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-import static org.junit.Assert.assertEquals;
-
-public class FormatToKeyValueMapperTest {
-
- @Test
- public void testBuildColumnInfoList() {
- List<ColumnInfo> columnInfoList = ImmutableList.of(
- new ColumnInfo("idCol", PInteger.INSTANCE.getSqlType()),
- new ColumnInfo("unsignedIntCol", PUnsignedInt.INSTANCE.getSqlType()),
- new ColumnInfo("stringArrayCol", PIntegerArray.INSTANCE.getSqlType()));
-
- Configuration conf = new Configuration();
- FormatToKeyValueMapper.configureColumnInfoList(conf, columnInfoList);
- List<ColumnInfo> fromConfig = FormatToKeyValueMapper.buildColumnInfoList(conf);
-
- assertEquals(columnInfoList, fromConfig);
- }
-
- @Test
- public void testBuildColumnInfoList_ContainingNulls() {
- // A null value in the column info list means "skip that column in the input"
- List<ColumnInfo> columnInfoListWithNull = Lists.newArrayList(
- new ColumnInfo("idCol", PInteger.INSTANCE.getSqlType()),
- null,
- new ColumnInfo("unsignedIntCol", PUnsignedInt.INSTANCE.getSqlType()),
- new ColumnInfo("stringArrayCol", PIntegerArray.INSTANCE.getSqlType()));
-
- Configuration conf = new Configuration();
- FormatToKeyValueMapper.configureColumnInfoList(conf, columnInfoListWithNull);
- List<ColumnInfo> fromConfig = FormatToKeyValueMapper.buildColumnInfoList(conf);
-
- assertEquals(columnInfoListWithNull, fromConfig);
- }
-
- @Test
- public void testLoadPreUpdateProcessor() {
- Configuration conf = new Configuration();
- conf.setClass(PhoenixConfigurationUtil.UPSERT_HOOK_CLASS_CONFKEY, MockUpsertProcessor.class,
- ImportPreUpsertKeyValueProcessor.class);
-
- ImportPreUpsertKeyValueProcessor processor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
- assertEquals(MockUpsertProcessor.class, processor.getClass());
- }
-
- @Test
- public void testLoadPreUpdateProcessor_NotConfigured() {
-
- Configuration conf = new Configuration();
- ImportPreUpsertKeyValueProcessor processor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
-
- assertEquals(FormatToKeyValueMapper.DefaultImportPreUpsertKeyValueProcessor.class,
- processor.getClass());
- }
-
- @Test(expected=IllegalStateException.class)
- public void testLoadPreUpdateProcessor_ClassNotFound() {
- Configuration conf = new Configuration();
- conf.set(PhoenixConfigurationUtil.UPSERT_HOOK_CLASS_CONFKEY, "MyUndefinedClass");
-
- PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
- }
-
- static class MockUpsertProcessor implements ImportPreUpsertKeyValueProcessor {
- @Override
- public List<KeyValue> preUpsert(byte[] rowKey, List<KeyValue> keyValues) {
- throw new UnsupportedOperationException("Not yet implemented");
- }
- }
-}