You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by sa...@apache.org on 2017/02/14 23:41:54 UTC
[05/50] [abbrv] phoenix git commit: PHOENIX-3134 varbinary fields
bulk load difference between MR/psql and upserts
PHOENIX-3134 varbinary fields bulk load difference between MR/psql and upserts
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/c7bb3faf
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/c7bb3faf
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/c7bb3faf
Branch: refs/heads/encodecolumns2
Commit: c7bb3faff5c2a3c19198e681e6989d25e57de2ba
Parents: 4f97085
Author: Ankit Singhal <an...@gmail.com>
Authored: Wed Jan 18 13:35:43 2017 +0530
Committer: Ankit Singhal <an...@gmail.com>
Committed: Wed Jan 18 13:35:43 2017 +0530
----------------------------------------------------------------------
.../expression/function/EncodeFormat.java | 4 +-
.../phoenix/mapreduce/CsvBulkImportUtil.java | 7 +-
.../phoenix/mapreduce/CsvBulkLoadTool.java | 12 ++-
.../org/apache/phoenix/query/QueryServices.java | 3 +
.../phoenix/query/QueryServicesOptions.java | 9 ++-
.../apache/phoenix/schema/types/PBinary.java | 6 +-
.../apache/phoenix/schema/types/PVarbinary.java | 5 +-
.../org/apache/phoenix/util/PhoenixRuntime.java | 11 +++
.../phoenix/util/csv/CsvUpsertExecutor.java | 25 ++++++
.../phoenix/util/json/JsonUpsertExecutor.java | 44 +++++++++++
.../mapreduce/CsvBulkImportUtilTest.java | 8 +-
.../util/AbstractUpsertExecutorTest.java | 82 +++++++++++++++++---
.../phoenix/util/csv/CsvUpsertExecutorTest.java | 26 +++----
.../util/json/JsonUpsertExecutorTest.java | 6 ++
14 files changed, 207 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7bb3faf/phoenix-core/src/main/java/org/apache/phoenix/expression/function/EncodeFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/EncodeFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/EncodeFormat.java
index ca6cb66..8130228 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/EncodeFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/EncodeFormat.java
@@ -20,5 +20,7 @@ package org.apache.phoenix.expression.function;
public enum EncodeFormat {
HEX, //format for encoding HEX value to bytes
- BASE62 //format for encoding a base 10 long value to base 62 string
+ BASE62, //format for encoding a base 10 long value to base 62 string
+ BASE64, //format for encoding a base 10 long value to base 64 string
+ ASCII // Plain Text
};
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7bb3faf/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
index 9289dbf..ff9ff72 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
import com.google.common.annotations.VisibleForTesting;
@@ -38,15 +39,19 @@ public class CsvBulkImportUtil {
* @param quoteChar quote character for the CSV input
* @param escapeChar escape character for the CSV input
* @param arrayDelimiter array delimiter character, can be null
+ * @param binaryEncoding
*/
public static void initCsvImportJob(Configuration conf, char fieldDelimiter, char quoteChar,
- char escapeChar, String arrayDelimiter) {
+ char escapeChar, String arrayDelimiter, String binaryEncoding) {
setChar(conf, CsvToKeyValueMapper.FIELD_DELIMITER_CONFKEY, fieldDelimiter);
setChar(conf, CsvToKeyValueMapper.QUOTE_CHAR_CONFKEY, quoteChar);
setChar(conf, CsvToKeyValueMapper.ESCAPE_CHAR_CONFKEY, escapeChar);
if (arrayDelimiter != null) {
conf.set(CsvToKeyValueMapper.ARRAY_DELIMITER_CONFKEY, arrayDelimiter);
}
+ if(binaryEncoding!=null){
+ conf.set(QueryServices.UPLOAD_BINARY_DATA_TYPE_ENCODING, binaryEncoding);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7bb3faf/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
index 8ed66b8..14b8c34 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkLoadTool.java
@@ -35,6 +35,7 @@ public class CsvBulkLoadTool extends AbstractBulkLoadTool {
static final Option QUOTE_OPT = new Option("q", "quote", true, "Supply a custom phrase delimiter, defaults to double quote character");
static final Option ESCAPE_OPT = new Option("e", "escape", true, "Supply a custom escape character, default is a backslash");
static final Option ARRAY_DELIMITER_OPT = new Option("a", "array-delimiter", true, "Array element delimiter (optional)");
+ static final Option binaryEncodingOption = new Option("b", "binaryEncoding", true, "Specifies binary encoding");
@Override
protected Options getOptions() {
@@ -43,6 +44,7 @@ public class CsvBulkLoadTool extends AbstractBulkLoadTool {
options.addOption(QUOTE_OPT);
options.addOption(ESCAPE_OPT);
options.addOption(ARRAY_DELIMITER_OPT);
+ options.addOption(binaryEncodingOption);
return options;
}
@@ -79,13 +81,19 @@ public class CsvBulkLoadTool extends AbstractBulkLoadTool {
}
escapeChar = escapeString.charAt(0);
}
-
+
+ String binaryEncoding = null;
+ if (cmdLine.hasOption(binaryEncodingOption.getOpt())) {
+ binaryEncoding = cmdLine.getOptionValue(binaryEncodingOption.getOpt());
+ }
+
CsvBulkImportUtil.initCsvImportJob(
conf,
delimiterChar,
quoteChar,
escapeChar,
- cmdLine.getOptionValue(ARRAY_DELIMITER_OPT.getOpt()));
+ cmdLine.getOptionValue(ARRAY_DELIMITER_OPT.getOpt()),
+ binaryEncoding);
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7bb3faf/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index ab8fb72..044768a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -224,6 +224,9 @@ public interface QueryServices extends SQLCloseable {
public static final String INDEX_POPULATION_SLEEP_TIME = "phoenix.index.population.wait.time";
public static final String LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB = "phoenix.client.localIndexUpgrade";
public static final String LIMITED_QUERY_SERIAL_THRESHOLD = "phoenix.limited.query.serial.threshold";
+
+ //currently BASE64 and ASCII is supported
+ public static final String UPLOAD_BINARY_DATA_TYPE_ENCODING = "phoenix.upload.binaryDataType.encoding";
public static final String INDEX_ASYNC_BUILD_ENABLED = "phoenix.index.async.build.enabled";
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7bb3faf/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 8c07ca6..a785436 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -79,6 +79,7 @@ import static org.apache.phoenix.query.QueryServices.STATS_USE_CURRENT_TIME_ATTR
import static org.apache.phoenix.query.QueryServices.THREAD_POOL_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.THREAD_TIMEOUT_MS_ATTRIB;
import static org.apache.phoenix.query.QueryServices.TRANSACTIONS_ENABLED;
+import static org.apache.phoenix.query.QueryServices.UPLOAD_BINARY_DATA_TYPE_ENCODING;
import static org.apache.phoenix.query.QueryServices.USE_BYTE_BASED_REGEX_ATTRIB;
import static org.apache.phoenix.query.QueryServices.USE_INDEXES_ATTRIB;
@@ -97,6 +98,7 @@ import org.apache.phoenix.util.DateUtil;
import org.apache.phoenix.util.ReadOnlyProps;
+
/**
* Options for {@link QueryServices}.
*
@@ -268,6 +270,10 @@ public class QueryServicesOptions {
}
};
public static final String DEFAULT_SCHEMA = null;
+ public static final String DEFAULT_UPLOAD_BINARY_DATA_TYPE_ENCODING = "BASE64"; // for backward compatibility, till
+ // 4.10, psql and CSVBulkLoad
+ // expects binary data to be base 64
+ // encoded
private final Configuration config;
@@ -335,7 +341,8 @@ public class QueryServicesOptions {
.setIfUnset(IS_NAMESPACE_MAPPING_ENABLED, DEFAULT_IS_NAMESPACE_MAPPING_ENABLED)
.setIfUnset(IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE, DEFAULT_IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE)
.setIfUnset(LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB, DEFAULT_LOCAL_INDEX_CLIENT_UPGRADE)
- .setIfUnset(AUTO_UPGRADE_ENABLED, DEFAULT_AUTO_UPGRADE_ENABLED);
+ .setIfUnset(AUTO_UPGRADE_ENABLED, DEFAULT_AUTO_UPGRADE_ENABLED)
+ .setIfUnset(UPLOAD_BINARY_DATA_TYPE_ENCODING, DEFAULT_UPLOAD_BINARY_DATA_TYPE_ENCODING);
// HBase sets this to 1, so we reset it to something more appropriate.
// Hopefully HBase will change this, because we can't know if a user set
// it to 1, so we'll change it.
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7bb3faf/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PBinary.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PBinary.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PBinary.java
index 43906f0..3a9dcc7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PBinary.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PBinary.java
@@ -22,7 +22,6 @@ import java.text.Format;
import java.util.Arrays;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.exception.DataExceedsCapacityException;
import org.apache.phoenix.query.QueryConstants;
@@ -182,10 +181,7 @@ public class PBinary extends PBinaryBase {
@Override
public Object toObject(String value) {
- if (value == null || value.length() == 0) {
- return null;
- }
- return Base64.decode(value);
+ return PVarbinary.INSTANCE.toObject(value);
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7bb3faf/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PVarbinary.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PVarbinary.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PVarbinary.java
index d96650d..b3ce57a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PVarbinary.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/types/PVarbinary.java
@@ -131,7 +131,10 @@ public class PVarbinary extends PBinaryBase {
if (value == null || value.length() == 0) {
return null;
}
- return Base64.decode(value);
+ Object object = Base64.decode(value);
+ if (object == null) { throw newIllegalDataException(
+ "Input: [" + value + "] is not base64 encoded"); }
+ return object;
}
@Override
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7bb3faf/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
index 0c74b84..1d9f946 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java
@@ -216,6 +216,9 @@ public class PhoenixRuntime {
if (execCmd.isLocalIndexUpgrade()) {
props.setProperty(QueryServices.LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB, "false");
}
+ if (execCmd.binaryEncoding != null) {
+ props.setProperty(QueryServices.UPLOAD_BINARY_DATA_TYPE_ENCODING, execCmd.binaryEncoding);
+ }
conn = DriverManager.getConnection(jdbcUrl, props).unwrap(PhoenixConnection.class);
conn.setRunningUpgrade(true);
if (execCmd.isMapNamespace()) {
@@ -535,6 +538,7 @@ public class PhoenixRuntime {
private boolean mapNamespace;
private String srcTable;
private boolean localIndexUpgrade;
+ private String binaryEncoding;
/**
* Factory method to build up an {@code ExecutionCommand} based on supplied parameters.
@@ -542,6 +546,8 @@ public class PhoenixRuntime {
public static ExecutionCommand parseArgs(String[] args) {
Option tableOption = new Option("t", "table", true,
"Overrides the table into which the CSV data is loaded and is case sensitive");
+ Option binaryEncodingOption = new Option("b", "binaryEncoding", true,
+ "Specifies binary encoding");
Option headerOption = new Option("h", "header", true, "Overrides the column names to" +
" which the CSV data maps and is case sensitive. A special value of " +
"in-line indicating that the first line of the CSV file determines the " +
@@ -591,6 +597,7 @@ public class PhoenixRuntime {
options.addOption(bypassUpgradeOption);
options.addOption(mapNamespaceOption);
options.addOption(localIndexUpgradeOption);
+ options.addOption(binaryEncodingOption);
CommandLineParser parser = new PosixParser();
CommandLine cmdLine = null;
@@ -609,6 +616,10 @@ public class PhoenixRuntime {
if (cmdLine.hasOption(tableOption.getOpt())) {
execCmd.tableName = cmdLine.getOptionValue(tableOption.getOpt());
}
+
+ if (cmdLine.hasOption(binaryEncodingOption.getOpt())) {
+ execCmd.binaryEncoding = cmdLine.getOptionValue(binaryEncodingOption.getOpt());
+ }
if (cmdLine.hasOption(headerOption.getOpt())) {
String columnString = cmdLine.getOptionValue(headerOption.getOpt());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7bb3faf/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java b/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java
index 0d3e17d..cd40b44 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/csv/CsvUpsertExecutor.java
@@ -27,12 +27,18 @@ import java.util.Properties;
import javax.annotation.Nullable;
import org.apache.commons.csv.CSVRecord;
+import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.expression.function.EncodeFormat;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.IllegalDataException;
+import org.apache.phoenix.schema.types.PBinary;
import org.apache.phoenix.schema.types.PBoolean;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PDataType.PDataCodec;
import org.apache.phoenix.schema.types.PTimestamp;
+import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.util.ColumnInfo;
import org.apache.phoenix.util.DateUtil;
import org.apache.phoenix.util.UpsertExecutor;
@@ -116,6 +122,7 @@ public class CsvUpsertExecutor extends UpsertExecutor<CSVRecord, String> {
private final PDataType dataType;
private final PDataCodec codec;
private final DateUtil.DateTimeParser dateTimeParser;
+ private final String binaryEncoding;
SimpleDatatypeConversionFunction(PDataType dataType, Connection conn) {
Properties props;
@@ -148,6 +155,8 @@ public class CsvUpsertExecutor extends UpsertExecutor<CSVRecord, String> {
this.dateTimeParser = null;
}
this.codec = codec;
+ this.binaryEncoding = props.getProperty(QueryServices.UPLOAD_BINARY_DATA_TYPE_ENCODING,
+ QueryServicesOptions.DEFAULT_UPLOAD_BINARY_DATA_TYPE_ENCODING);
}
@Nullable
@@ -175,6 +184,22 @@ public class CsvUpsertExecutor extends UpsertExecutor<CSVRecord, String> {
throw new RuntimeException("Invalid boolean value: '" + input
+ "', must be one of ['true','t','1','false','f','0']");
}
+ }else if (dataType == PVarbinary.INSTANCE || dataType == PBinary.INSTANCE){
+ EncodeFormat format = EncodeFormat.valueOf(binaryEncoding.toUpperCase());
+ Object object = null;
+ switch (format) {
+ case BASE64:
+ object = Base64.decode(input);
+ if (object == null) { throw new IllegalDataException(
+ "Input: [" + input + "] is not base64 encoded"); }
+ break;
+ case ASCII:
+ object = Bytes.toBytes(input);
+ break;
+ default:
+ throw new IllegalDataException("Unsupported encoding \"" + binaryEncoding + "\"");
+ }
+ return object;
}
return dataType.toObject(input);
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7bb3faf/phoenix-core/src/main/java/org/apache/phoenix/util/json/JsonUpsertExecutor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/json/JsonUpsertExecutor.java b/phoenix-core/src/main/java/org/apache/phoenix/util/json/JsonUpsertExecutor.java
index bbe0e30..ffa797d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/json/JsonUpsertExecutor.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/json/JsonUpsertExecutor.java
@@ -25,12 +25,20 @@ import java.sql.Types;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+
import javax.annotation.Nullable;
+import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.expression.function.EncodeFormat;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.IllegalDataException;
+import org.apache.phoenix.schema.types.PBinary;
+import org.apache.phoenix.schema.types.PBoolean;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PTimestamp;
+import org.apache.phoenix.schema.types.PVarbinary;
import org.apache.phoenix.util.ColumnInfo;
import org.apache.phoenix.util.DateUtil;
import org.apache.phoenix.util.UpsertExecutor;
@@ -137,6 +145,7 @@ public class JsonUpsertExecutor extends UpsertExecutor<Map<?, ?>, Object> {
private final PDataType dataType;
private final DateUtil.DateTimeParser dateTimeParser;
+ private final String binaryEncoding;
SimpleDatatypeConversionFunction(PDataType dataType, Connection conn) {
Properties props;
@@ -166,6 +175,8 @@ public class JsonUpsertExecutor extends UpsertExecutor<Map<?, ?>, Object> {
} else {
this.dateTimeParser = null;
}
+ this.binaryEncoding = props.getProperty(QueryServices.UPLOAD_BINARY_DATA_TYPE_ENCODING,
+ QueryServicesOptions.DEFAULT_UPLOAD_BINARY_DATA_TYPE_ENCODING);
}
@Nullable
@@ -180,7 +191,40 @@ public class JsonUpsertExecutor extends UpsertExecutor<Map<?, ?>, Object> {
byte[] byteValue = new byte[dataType.getByteSize()];
dataType.getCodec().encodeLong(epochTime, byteValue, 0);
return dataType.toObject(byteValue);
+ }else if (dataType == PBoolean.INSTANCE) {
+ switch (input.toString()) {
+ case "true":
+ case "t":
+ case "T":
+ case "1":
+ return Boolean.TRUE;
+ case "false":
+ case "f":
+ case "F":
+ case "0":
+ return Boolean.FALSE;
+ default:
+ throw new RuntimeException("Invalid boolean value: '" + input
+ + "', must be one of ['true','t','1','false','f','0']");
+ }
+ }else if (dataType == PVarbinary.INSTANCE || dataType == PBinary.INSTANCE){
+ EncodeFormat format = EncodeFormat.valueOf(binaryEncoding.toUpperCase());
+ Object object = null;
+ switch (format) {
+ case BASE64:
+ object = Base64.decode(input.toString());
+ if (object == null) { throw new IllegalDataException(
+ "Input: [" + input + "] is not base64 encoded"); }
+ break;
+ case ASCII:
+ object = Bytes.toBytes(input.toString());
+ break;
+ default:
+ throw new IllegalDataException("Unsupported encoding \"" + binaryEncoding + "\"");
}
+ return object;
+ }
+
return dataType.toObject(input, dataType);
}
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7bb3faf/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
index 3c6271a..33c72a8 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
@@ -17,6 +17,9 @@
*/
package org.apache.phoenix.mapreduce;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
@@ -28,9 +31,6 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
public class CsvBulkImportUtilTest {
@Test
@@ -41,7 +41,7 @@ public class CsvBulkImportUtilTest {
char quote = '\002';
char escape = '!';
- CsvBulkImportUtil.initCsvImportJob(conf, delimiter, quote, escape, null);
+ CsvBulkImportUtil.initCsvImportJob(conf, delimiter, quote, escape, null, null);
// Serialize and deserialize the config to ensure that there aren't any issues
// with non-printable characters as delimiters
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7bb3faf/phoenix-core/src/test/java/org/apache/phoenix/util/AbstractUpsertExecutorTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/AbstractUpsertExecutorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/AbstractUpsertExecutorTest.java
index 61b03fb..2b2544d 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/AbstractUpsertExecutorTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/AbstractUpsertExecutorTest.java
@@ -17,6 +17,12 @@
*/
package org.apache.phoenix.util;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
@@ -25,9 +31,14 @@ import java.sql.SQLException;
import java.sql.Types;
import java.util.Arrays;
import java.util.List;
+import java.util.Properties;
+import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.query.BaseConnectionlessQueryTest;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.types.PArrayDataType;
+import org.apache.phoenix.schema.types.PBinary;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.schema.types.PIntegerArray;
import org.junit.After;
@@ -36,12 +47,6 @@ import org.junit.Test;
import com.google.common.collect.ImmutableList;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-
public abstract class AbstractUpsertExecutorTest<R, F> extends BaseConnectionlessQueryTest {
protected Connection conn;
@@ -51,6 +56,7 @@ public abstract class AbstractUpsertExecutorTest<R, F> extends BaseConnectionles
protected abstract UpsertExecutor<R, F> getUpsertExecutor();
protected abstract R createRecord(Object... columnValues) throws IOException;
+ protected abstract UpsertExecutor<R, F> getUpsertExecutor(Connection conn);
@Before
public void setUp() throws SQLException {
@@ -59,7 +65,8 @@ public abstract class AbstractUpsertExecutorTest<R, F> extends BaseConnectionles
new ColumnInfo("NAME", Types.VARCHAR),
new ColumnInfo("AGE", Types.INTEGER),
new ColumnInfo("VALUES", PIntegerArray.INSTANCE.getSqlType()),
- new ColumnInfo("BEARD", Types.BOOLEAN));
+ new ColumnInfo("BEARD", Types.BOOLEAN),
+ new ColumnInfo("PIC", Types.BINARY));
preparedStatement = mock(PreparedStatement.class);
upsertListener = mock(UpsertExecutor.UpsertListener.class);
@@ -73,8 +80,10 @@ public abstract class AbstractUpsertExecutorTest<R, F> extends BaseConnectionles
@Test
public void testExecute() throws Exception {
+ byte[] binaryData=(byte[])PBinary.INSTANCE.getSampleValue();
+ String encodedBinaryData = Base64.encodeBytes(binaryData);
getUpsertExecutor().execute(createRecord(123L, "NameValue", 42,
- Arrays.asList(1, 2, 3), true));
+ Arrays.asList(1, 2, 3), true, encodedBinaryData));
verify(upsertListener).upsertDone(1L);
verifyNoMoreInteractions(upsertListener);
@@ -84,6 +93,7 @@ public abstract class AbstractUpsertExecutorTest<R, F> extends BaseConnectionles
verify(preparedStatement).setObject(3, Integer.valueOf(42));
verify(preparedStatement).setObject(4, PArrayDataType.instantiatePhoenixArray(PInteger.INSTANCE, new Object[]{1,2,3}));
verify(preparedStatement).setObject(5, Boolean.TRUE);
+ verify(preparedStatement).setObject(6, binaryData);
verify(preparedStatement).execute();
verifyNoMoreInteractions(preparedStatement);
}
@@ -99,8 +109,10 @@ public abstract class AbstractUpsertExecutorTest<R, F> extends BaseConnectionles
@Test
public void testExecute_TooManyFields() throws Exception {
+ byte[] binaryData=(byte[])PBinary.INSTANCE.getSampleValue();
+ String encodedBinaryData = Base64.encodeBytes(binaryData);
R recordWithTooManyFields = createRecord(123L, "NameValue", 42, Arrays.asList(1, 2, 3),
- true, "Garbage");
+ true, encodedBinaryData, "garbage");
getUpsertExecutor().execute(recordWithTooManyFields);
verify(upsertListener).upsertDone(1L);
@@ -111,14 +123,17 @@ public abstract class AbstractUpsertExecutorTest<R, F> extends BaseConnectionles
verify(preparedStatement).setObject(3, Integer.valueOf(42));
verify(preparedStatement).setObject(4, PArrayDataType.instantiatePhoenixArray(PInteger.INSTANCE, new Object[]{1,2,3}));
verify(preparedStatement).setObject(5, Boolean.TRUE);
+ verify(preparedStatement).setObject(6, binaryData);
verify(preparedStatement).execute();
verifyNoMoreInteractions(preparedStatement);
}
@Test
public void testExecute_NullField() throws Exception {
+ byte[] binaryData=(byte[])PBinary.INSTANCE.getSampleValue();
+ String encodedBinaryData = Base64.encodeBytes(binaryData);
getUpsertExecutor().execute(createRecord(123L, "NameValue", null,
- Arrays.asList(1, 2, 3), false));
+ Arrays.asList(1, 2, 3), false, encodedBinaryData));
verify(upsertListener).upsertDone(1L);
verifyNoMoreInteractions(upsertListener);
@@ -128,17 +143,62 @@ public abstract class AbstractUpsertExecutorTest<R, F> extends BaseConnectionles
verify(preparedStatement).setNull(3, columnInfoList.get(2).getSqlType());
verify(preparedStatement).setObject(4, PArrayDataType.instantiatePhoenixArray(PInteger.INSTANCE, new Object[]{1,2,3}));
verify(preparedStatement).setObject(5, Boolean.FALSE);
+ verify(preparedStatement).setObject(6, binaryData);
verify(preparedStatement).execute();
verifyNoMoreInteractions(preparedStatement);
}
@Test
public void testExecute_InvalidType() throws Exception {
+ byte[] binaryData=(byte[])PBinary.INSTANCE.getSampleValue();
+ String encodedBinaryData = Base64.encodeBytes(binaryData);
R recordWithInvalidType = createRecord(123L, "NameValue", "ThisIsNotANumber",
- Arrays.asList(1, 2, 3), true);
+ Arrays.asList(1, 2, 3), true, encodedBinaryData);
getUpsertExecutor().execute(recordWithInvalidType);
verify(upsertListener).errorOnRecord(eq(recordWithInvalidType), any(Throwable.class));
verifyNoMoreInteractions(upsertListener);
}
+
+ @Test
+ public void testExecute_InvalidBoolean() throws Exception {
+ byte[] binaryData=(byte[])PBinary.INSTANCE.getSampleValue();
+ String encodedBinaryData = Base64.encodeBytes(binaryData);
+ R csvRecordWithInvalidType = createRecord("123,NameValue,42,1:2:3,NotABoolean,"+encodedBinaryData);
+ getUpsertExecutor().execute(csvRecordWithInvalidType);
+
+ verify(upsertListener).errorOnRecord(eq(csvRecordWithInvalidType), any(Throwable.class));
+ }
+
+ @Test
+ public void testExecute_InvalidBinary() throws Exception {
+ String notBase64Encoded="#@$df";
+ R csvRecordWithInvalidType = createRecord("123,NameValue,42,1:2:3,true,"+notBase64Encoded);
+ getUpsertExecutor().execute(csvRecordWithInvalidType);
+
+ verify(upsertListener).errorOnRecord(eq(csvRecordWithInvalidType), any(Throwable.class));
+ }
+
+ @Test
+ public void testExecute_AsciiEncoded() throws Exception {
+ String asciiValue="#@$df";
+ Properties info=new Properties();
+ info.setProperty(QueryServices.UPLOAD_BINARY_DATA_TYPE_ENCODING,"ASCII");
+ getUpsertExecutor(DriverManager.getConnection(getUrl(),info)).execute(createRecord(123L, "NameValue", 42,
+ Arrays.asList(1, 2, 3), true, asciiValue));
+
+ verify(upsertListener).upsertDone(1L);
+ verifyNoMoreInteractions(upsertListener);
+
+ verify(preparedStatement).setObject(1, Long.valueOf(123L));
+ verify(preparedStatement).setObject(2, "NameValue");
+ verify(preparedStatement).setObject(3, Integer.valueOf(42));
+ verify(preparedStatement).setObject(4, PArrayDataType.instantiatePhoenixArray(PInteger.INSTANCE, new Object[]{1,2,3}));
+ verify(preparedStatement).setObject(5, Boolean.TRUE);
+ verify(preparedStatement).setObject(6, Bytes.toBytes(asciiValue));
+ verify(preparedStatement).execute();
+ verifyNoMoreInteractions(preparedStatement);
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7bb3faf/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java
index c887ff7..a5ec4fa 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/csv/CsvUpsertExecutorTest.java
@@ -18,23 +18,19 @@
package org.apache.phoenix.util.csv;
import java.io.IOException;
+import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.apache.phoenix.util.AbstractUpsertExecutorTest;
import org.apache.phoenix.util.UpsertExecutor;
import org.junit.Before;
-import org.junit.Test;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.verify;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
public class CsvUpsertExecutorTest extends AbstractUpsertExecutorTest<CSVRecord, String> {
@@ -46,7 +42,13 @@ public class CsvUpsertExecutorTest extends AbstractUpsertExecutorTest<CSVRecord,
public UpsertExecutor<CSVRecord, String> getUpsertExecutor() {
return upsertExecutor;
}
-
+
+ @Override
+ public UpsertExecutor<CSVRecord, String> getUpsertExecutor(Connection conn) {
+ return new CsvUpsertExecutor(conn, columnInfoList, preparedStatement,
+ upsertListener, ARRAY_SEP);
+ }
+
@Override
public CSVRecord createRecord(Object... columnValues) throws IOException {
for (int i = 0; i < columnValues.length; i++) {
@@ -69,11 +71,5 @@ public class CsvUpsertExecutorTest extends AbstractUpsertExecutorTest<CSVRecord,
upsertListener, ARRAY_SEP);
}
- @Test
- public void testExecute_InvalidBoolean() throws Exception {
- CSVRecord csvRecordWithInvalidType = createRecord("123,NameValue,42,1:2:3,NotABoolean");
- upsertExecutor.execute(ImmutableList.of(csvRecordWithInvalidType));
-
- verify(upsertListener).errorOnRecord(eq(csvRecordWithInvalidType), any(Throwable.class));
- }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/c7bb3faf/phoenix-core/src/test/java/org/apache/phoenix/util/json/JsonUpsertExecutorTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/json/JsonUpsertExecutorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/json/JsonUpsertExecutorTest.java
index c042dd4..6ac9cf9 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/json/JsonUpsertExecutorTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/json/JsonUpsertExecutorTest.java
@@ -18,6 +18,7 @@
package org.apache.phoenix.util.json;
import java.io.IOException;
+import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
@@ -50,4 +51,9 @@ public class JsonUpsertExecutorTest extends AbstractUpsertExecutorTest<Map<?, ?>
super.setUp();
upsertExecutor = new JsonUpsertExecutor(conn, columnInfoList, preparedStatement, upsertListener);
}
+
+ @Override
+ protected UpsertExecutor<Map<?, ?>, Object> getUpsertExecutor(Connection conn) {
+ return new JsonUpsertExecutor(conn, columnInfoList, preparedStatement, upsertListener);
+ }
}