You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gr...@apache.org on 2017/02/20 07:55:14 UTC
phoenix git commit: PHOENIX-3538 Regex bulk loader
Repository: phoenix
Updated Branches:
refs/heads/master b5cf5aa2c -> d18da38af
PHOENIX-3538 Regex bulk loader
Add bulk loader which parses input based on a regular expression.
Contributed by kalyanhadooptraining@gmail.com
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d18da38a
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d18da38a
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d18da38a
Branch: refs/heads/master
Commit: d18da38afa0d7bbc0221f6472bc3b037edc6e3d4
Parents: b5cf5aa
Author: Gabriel Reid <ga...@ngdata.com>
Authored: Sun Feb 19 20:28:14 2017 +0100
Committer: Gabriel Reid <ga...@ngdata.com>
Committed: Mon Feb 20 08:17:57 2017 +0100
----------------------------------------------------------------------
.../phoenix/end2end/RegexBulkLoadToolIT.java | 371 +++++++++++++++++++
.../phoenix/mapreduce/RegexBulkLoadTool.java | 74 ++++
.../mapreduce/RegexToKeyValueMapper.java | 135 +++++++
.../phoenix/util/regex/RegexUpsertExecutor.java | 80 ++++
4 files changed, 660 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d18da38a/phoenix-core/src/it/java/org/apache/phoenix/end2end/RegexBulkLoadToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/RegexBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/RegexBulkLoadToolIT.java
new file mode 100644
index 0000000..47b0db7
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/RegexBulkLoadToolIT.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.PrintWriter;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.phoenix.mapreduce.RegexBulkLoadTool;
+import org.apache.phoenix.util.DateUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class RegexBulkLoadToolIT extends BaseOwnClusterIT {
+
+ private static Connection conn;
+ private static String zkQuorum;
+
+ @BeforeClass
+ public static void doSetup() throws Exception {
+ setUpTestDriver(ReadOnlyProps.EMPTY_PROPS);
+ zkQuorum = TestUtil.LOCALHOST + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + getUtility().getZkCluster().getClientPort();
+ conn = DriverManager.getConnection(getUrl());
+ }
+
+ @Test
+ public void testBasicImport() throws Exception {
+
+ Statement stmt = conn.createStatement();
+ stmt.execute("CREATE TABLE S.TABLE1 (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, T DATE) SPLIT ON (1,2)");
+
+ FileSystem fs = FileSystem.get(getUtility().getConfiguration());
+ FSDataOutputStream outputStream = fs.create(new Path("/tmp/input1.csv"));
+ PrintWriter printWriter = new PrintWriter(outputStream);
+ printWriter.println("1,Name 1,1970/01/01");
+ printWriter.println("2,Name 2,1970/01/02");
+ printWriter.close();
+
+ RegexBulkLoadTool regexBulkLoadTool = new RegexBulkLoadTool();
+ regexBulkLoadTool.setConf(getUtility().getConfiguration());
+ regexBulkLoadTool.getConf().set(DATE_FORMAT_ATTRIB,"yyyy/MM/dd");
+ int exitCode = regexBulkLoadTool.run(new String[] {
+ "--input", "/tmp/input1.csv",
+ "--table", "table1",
+ "--schema", "s",
+ "--regex", "([^,]*),([^,]*),([^,]*)",
+ "--zookeeper", zkQuorum});
+ assertEquals(0, exitCode);
+
+ ResultSet rs = stmt.executeQuery("SELECT id, name, t FROM s.table1 ORDER BY id");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertEquals("Name 1", rs.getString(2));
+ assertEquals(DateUtil.parseDate("1970-01-01"), rs.getDate(3));
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
+ assertEquals("Name 2", rs.getString(2));
+ assertEquals(DateUtil.parseDate("1970-01-02"), rs.getDate(3));
+ assertFalse(rs.next());
+
+ rs.close();
+ stmt.close();
+ }
+
+ @Test
+ public void testFullOptionImport() throws Exception {
+
+ Statement stmt = conn.createStatement();
+ stmt.execute("CREATE TABLE TABLE2 (ID INTEGER NOT NULL PRIMARY KEY, " +
+ "NAME VARCHAR, NAMES VARCHAR ARRAY, FLAG BOOLEAN)");
+
+ FileSystem fs = FileSystem.get(getUtility().getConfiguration());
+ FSDataOutputStream outputStream = fs.create(new Path("/tmp/input2.csv"));
+ PrintWriter printWriter = new PrintWriter(outputStream);
+ printWriter.println("1|Name 1a;Name 1b,true");
+ printWriter.println("2|Name 2a;Name 2b,false");
+ printWriter.close();
+
+ RegexBulkLoadTool regexBulkLoadTool = new RegexBulkLoadTool();
+ regexBulkLoadTool.setConf(getUtility().getConfiguration());
+ int exitCode = regexBulkLoadTool.run(new String[] {
+ "--input", "/tmp/input2.csv",
+ "--table", "table2",
+ "--zookeeper", zkQuorum,
+ "--array-delimiter", ";",
+ "--regex", "([^|]*)\\|([^,]*),([^,]*)",
+ "--import-columns", "ID,NAMES,FLAG"});
+ assertEquals(0, exitCode);
+
+ ResultSet rs = stmt.executeQuery("SELECT id, names FROM table2 ORDER BY id");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertArrayEquals(new Object[] { "Name 1a", "Name 1b" }, (Object[]) rs.getArray(2).getArray());
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
+ assertArrayEquals(new Object[] { "Name 2a", "Name 2b" }, (Object[]) rs.getArray(2).getArray());
+ assertFalse(rs.next());
+
+ rs.close();
+ stmt.close();
+ }
+
+ @Test
+ public void testMultipleInputFiles() throws Exception {
+
+ Statement stmt = conn.createStatement();
+ stmt.execute("CREATE TABLE TABLE7 (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, T DATE) SPLIT ON (1,2)");
+
+ FileSystem fs = FileSystem.get(getUtility().getConfiguration());
+ FSDataOutputStream outputStream = fs.create(new Path("/tmp/input1.csv"));
+ PrintWriter printWriter = new PrintWriter(outputStream);
+ printWriter.println("1,Name 1,1970/01/01");
+ printWriter.close();
+ outputStream = fs.create(new Path("/tmp/input2.csv"));
+ printWriter = new PrintWriter(outputStream);
+ printWriter.println("2,Name 2,1970/01/02");
+ printWriter.close();
+
+ RegexBulkLoadTool regexBulkLoadTool = new RegexBulkLoadTool();
+ regexBulkLoadTool.setConf(getUtility().getConfiguration());
+ regexBulkLoadTool.getConf().set(DATE_FORMAT_ATTRIB,"yyyy/MM/dd");
+ int exitCode = regexBulkLoadTool.run(new String[] {
+ "--input", "/tmp/input1.csv,/tmp/input2.csv",
+ "--table", "table7",
+ "--regex", "([^,]*),([^,]*),([^,]*)",
+ "--zookeeper", zkQuorum});
+ assertEquals(0, exitCode);
+
+ ResultSet rs = stmt.executeQuery("SELECT id, name, t FROM table7 ORDER BY id");
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertEquals("Name 1", rs.getString(2));
+ assertEquals(DateUtil.parseDate("1970-01-01"), rs.getDate(3));
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
+ assertEquals("Name 2", rs.getString(2));
+ assertEquals(DateUtil.parseDate("1970-01-02"), rs.getDate(3));
+ assertFalse(rs.next());
+
+ rs.close();
+ stmt.close();
+ }
+
+ @Test
+ public void testImportWithIndex() throws Exception {
+
+ Statement stmt = conn.createStatement();
+ stmt.execute("CREATE TABLE TABLE3 (ID INTEGER NOT NULL PRIMARY KEY, " +
+ "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)");
+ String ddl = "CREATE INDEX TABLE3_IDX ON TABLE3 "
+ + " (FIRST_NAME ASC)"
+ + " INCLUDE (LAST_NAME)";
+ stmt.execute(ddl);
+
+ FileSystem fs = FileSystem.get(getUtility().getConfiguration());
+ FSDataOutputStream outputStream = fs.create(new Path("/tmp/input3.csv"));
+ PrintWriter printWriter = new PrintWriter(outputStream);
+ printWriter.println("1,FirstName 1,LastName 1");
+ printWriter.println("2,FirstName 2,LastName 2");
+ printWriter.close();
+
+ RegexBulkLoadTool regexBulkLoadTool = new RegexBulkLoadTool();
+ regexBulkLoadTool.setConf(getUtility().getConfiguration());
+ int exitCode = regexBulkLoadTool.run(new String[] {
+ "--input", "/tmp/input3.csv",
+ "--table", "table3",
+ "--regex", "([^,]*),([^,]*),([^,]*)",
+ "--zookeeper", zkQuorum});
+ assertEquals(0, exitCode);
+
+ ResultSet rs = stmt.executeQuery("SELECT id, FIRST_NAME FROM TABLE3 where first_name='FirstName 2'");
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
+ assertEquals("FirstName 2", rs.getString(2));
+
+ rs.close();
+ stmt.close();
+ }
+
+ @Test
+ public void testImportWithLocalIndex() throws Exception {
+
+ Statement stmt = conn.createStatement();
+ stmt.execute("CREATE TABLE TABLE6 (ID INTEGER NOT NULL PRIMARY KEY, " +
+ "FIRST_NAME VARCHAR, LAST_NAME VARCHAR) SPLIt ON (1,2)");
+ String ddl = "CREATE LOCAL INDEX TABLE6_IDX ON TABLE6 "
+ + " (FIRST_NAME ASC)";
+ stmt.execute(ddl);
+ ddl = "CREATE LOCAL INDEX TABLE6_IDX2 ON TABLE6 " + " (LAST_NAME ASC)";
+ stmt.execute(ddl);
+
+ FileSystem fs = FileSystem.get(getUtility().getConfiguration());
+ FSDataOutputStream outputStream = fs.create(new Path("/tmp/input3.csv"));
+ PrintWriter printWriter = new PrintWriter(outputStream);
+ printWriter.println("1,FirstName 1:LastName 1");
+ printWriter.println("2,FirstName 2:LastName 2");
+ printWriter.close();
+
+ RegexBulkLoadTool regexBulkLoadTool = new RegexBulkLoadTool();
+ regexBulkLoadTool.setConf(getUtility().getConfiguration());
+ int exitCode = regexBulkLoadTool.run(new String[] {
+ "--input", "/tmp/input3.csv",
+ "--table", "table6",
+ "--regex", "([^,]*),([^:]*):([^,]*)",
+ "--zookeeper", zkQuorum});
+ assertEquals(0, exitCode);
+
+ ResultSet rs = stmt.executeQuery("SELECT id, FIRST_NAME FROM TABLE6 where first_name='FirstName 2'");
+ assertTrue(rs.next());
+ assertEquals(2, rs.getInt(1));
+ assertEquals("FirstName 2", rs.getString(2));
+
+ rs.close();
+ stmt.close();
+ }
+
+ @Test
+ public void testImportOneIndexTable() throws Exception {
+ testImportOneIndexTable("TABLE4", false);
+ }
+
+ @Test
+ public void testImportOneLocalIndexTable() throws Exception {
+ testImportOneIndexTable("TABLE5", true);
+ }
+
+ public void testImportOneIndexTable(String tableName, boolean localIndex) throws Exception {
+
+ String indexTableName = String.format("%s_IDX", tableName);
+ Statement stmt = conn.createStatement();
+ stmt.execute("CREATE TABLE " + tableName + "(ID INTEGER NOT NULL PRIMARY KEY, "
+ + "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)");
+ String ddl =
+ "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexTableName + " ON "
+ + tableName + "(FIRST_NAME ASC)";
+ stmt.execute(ddl);
+
+ FileSystem fs = FileSystem.get(getUtility().getConfiguration());
+ FSDataOutputStream outputStream = fs.create(new Path("/tmp/input4.csv"));
+ PrintWriter printWriter = new PrintWriter(outputStream);
+ printWriter.println("1,FirstName 1,LastName 1");
+ printWriter.println("2,FirstName 2,LastName 2");
+ printWriter.close();
+
+ RegexBulkLoadTool regexBulkLoadTool = new RegexBulkLoadTool();
+ regexBulkLoadTool.setConf(getUtility().getConfiguration());
+ int exitCode = regexBulkLoadTool.run(new String[] {
+ "--input", "/tmp/input4.csv",
+ "--table", tableName,
+ "--regex", "([^,]*),([^,]*),([^,]*)",
+ "--index-table", indexTableName,
+ "--zookeeper", zkQuorum });
+ assertEquals(0, exitCode);
+
+ ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName);
+ assertFalse(rs.next());
+ rs = stmt.executeQuery("SELECT FIRST_NAME FROM " + tableName + " where FIRST_NAME='FirstName 1'");
+ assertTrue(rs.next());
+ assertEquals("FirstName 1", rs.getString(1));
+
+ rs.close();
+ stmt.close();
+ }
+
+ @Test
+ public void testInvalidArguments() {
+ String tableName = "TABLE8";
+ RegexBulkLoadTool regexBulkLoadTool = new RegexBulkLoadTool();
+ regexBulkLoadTool.setConf(getUtility().getConfiguration());
+ try {
+ regexBulkLoadTool.run(new String[] {
+ "--input", "/tmp/input4.csv",
+ "--table", tableName,
+ "--regex", "([^,]*),([^,]*),([^,]*)",
+ "--zookeeper", zkQuorum });
+ fail(String.format("Table %s not created, hence should fail",tableName));
+ } catch (Exception ex) {
+ assertTrue(ex instanceof IllegalArgumentException);
+ assertTrue(ex.getMessage().contains(String.format("Table %s not found", tableName)));
+ }
+ }
+
+ @Test
+ public void testAlreadyExistsOutputPath() {
+ String tableName = "TABLE9";
+ String outputPath = "/tmp/output/tabl9";
+ try {
+ Statement stmt = conn.createStatement();
+ stmt.execute("CREATE TABLE " + tableName + "(ID INTEGER NOT NULL PRIMARY KEY, "
+ + "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)");
+
+ FileSystem fs = FileSystem.get(getUtility().getConfiguration());
+ fs.create(new Path(outputPath));
+ FSDataOutputStream outputStream = fs.create(new Path("/tmp/input9.csv"));
+ PrintWriter printWriter = new PrintWriter(outputStream);
+ printWriter.println("1,FirstName 1,LastName 1");
+ printWriter.println("2,FirstName 2,LastName 2");
+ printWriter.close();
+
+ RegexBulkLoadTool regexBulkLoadTool = new RegexBulkLoadTool();
+ regexBulkLoadTool.setConf(getUtility().getConfiguration());
+ regexBulkLoadTool.run(new String[] {
+ "--input", "/tmp/input9.csv",
+ "--output", outputPath,
+ "--table", tableName,
+ "--regex", "([^,]*),([^,]*),([^,]*)",
+ "--zookeeper", zkQuorum });
+
+ fail(String.format("Output path %s already exists. hence, should fail",outputPath));
+ } catch (Exception ex) {
+ assertTrue(ex instanceof FileAlreadyExistsException);
+ }
+ }
+
+ @Test
+ public void testInvalidRegex() throws Exception {
+ Statement stmt = conn.createStatement();
+ stmt.execute("CREATE TABLE TABLE10 (ID INTEGER NOT NULL PRIMARY KEY, " +
+ "NAME VARCHAR, NAMES VARCHAR ARRAY, FLAG BOOLEAN)");
+
+ FileSystem fs = FileSystem.get(getUtility().getConfiguration());
+ FSDataOutputStream outputStream = fs.create(new Path("/tmp/input10.csv"));
+ PrintWriter printWriter = new PrintWriter(outputStream);
+ printWriter.println("1|Name 1a;Name 1b,true");
+ printWriter.println("2|Name 2a;Name 2b");
+ printWriter.close();
+
+ RegexBulkLoadTool regexBulkLoadTool = new RegexBulkLoadTool();
+ regexBulkLoadTool.setConf(getUtility().getConfiguration());
+ int exitCode = regexBulkLoadTool.run(new String[] {
+ "--input", "/tmp/input10.csv",
+ "--table", "table10",
+ "--zookeeper", zkQuorum,
+ "--array-delimiter", ";",
+ "--regex", "([^|]*)\\|([^,]*),([^,]*)",
+ "--import-columns", "ID,NAMES,FLAG"});
+ assertEquals(-1, exitCode);
+ stmt.close();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d18da38a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/RegexBulkLoadTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/RegexBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/RegexBulkLoadTool.java
new file mode 100644
index 0000000..94544c9
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/RegexBulkLoadTool.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.util.ColumnInfo;
+
+/**
+ * A tool for running MapReduce-based ingests of input data based on regex.
+ * Lists are converted into typed ARRAYS.
+ */
+public class RegexBulkLoadTool extends AbstractBulkLoadTool {
+
+ static final Option REGEX_OPT = new Option("r", "regex", true, "Input regex String, defaults is (.*)");
+ static final Option ARRAY_DELIMITER_OPT = new Option("a", "array-delimiter", true, "Array element delimiter (optional), defaults is ','");
+
+ @Override
+ protected Options getOptions() {
+ Options options = super.getOptions();
+ options.addOption(REGEX_OPT);
+ options.addOption(ARRAY_DELIMITER_OPT);
+ return options;
+ }
+
+ @Override
+ protected void configureOptions(CommandLine cmdLine, List<ColumnInfo> importColumns,
+ Configuration conf) throws SQLException {
+ if (cmdLine.hasOption(REGEX_OPT.getOpt())) {
+ String regexString = cmdLine.getOptionValue(REGEX_OPT.getOpt());
+ conf.set(RegexToKeyValueMapper.REGEX_CONFKEY, regexString);
+ }
+
+ if (cmdLine.hasOption(ARRAY_DELIMITER_OPT.getOpt())) {
+ String arraySeparator = cmdLine.getOptionValue(ARRAY_DELIMITER_OPT.getOpt());
+ conf.set(RegexToKeyValueMapper.ARRAY_DELIMITER_CONFKEY, arraySeparator);
+ }
+ }
+
+ @Override
+ protected void setupJob(Job job) {
+ // Allow overriding the job jar setting by using a -D system property at startup
+ if (job.getJar() == null) {
+ job.setJarByClass(RegexToKeyValueMapper.class);
+ }
+ job.setMapperClass(RegexToKeyValueMapper.class);
+ }
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new RegexBulkLoadTool(), args);
+ }
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d18da38a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/RegexToKeyValueMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/RegexToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/RegexToKeyValueMapper.java
new file mode 100644
index 0000000..f63923d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/RegexToKeyValueMapper.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PTimestamp;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.UpsertExecutor;
+import org.apache.phoenix.util.regex.RegexUpsertExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+/**
+ * MapReduce mapper that converts input lines into KeyValues based on the Regex that can be written to HFiles.
+ * <p/>
+ * KeyValues are produced by executing UPSERT statements on a Phoenix connection and then
+ * extracting the created KeyValues and rolling back the statement execution before it is
+ * committed to HBase.
+ */
+public class RegexToKeyValueMapper extends FormatToBytesWritableMapper<Map<?, ?>> {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(RegexToKeyValueMapper.class);
+
+ /** Configuration key for the regex */
+ public static final String REGEX_CONFKEY = "phoenix.mapreduce.import.regex";
+
+ /** Configuration key for the array element delimiter for input arrays */
+ public static final String ARRAY_DELIMITER_CONFKEY = "phoenix.mapreduce.import.arraydelimiter";
+
+ /** Configuration key for default array delimiter */
+ public static final String ARRAY_DELIMITER_DEFAULT = ",";
+
+ private LineParser<Map<?, ?>> lineParser;
+
+ @Override
+ protected LineParser<Map<?, ?>> getLineParser() {
+ return lineParser;
+ }
+
+ @Override
+ protected void setup(Context context) throws IOException, InterruptedException {
+ super.setup(context);
+ }
+
+ @VisibleForTesting
+ @Override
+ protected UpsertExecutor<Map<?, ?>, ?> buildUpsertExecutor(Configuration conf) {
+ String tableName = conf.get(TABLE_NAME_CONFKEY);
+ Preconditions.checkNotNull(tableName, "table name is not configured");
+
+ String regex = conf.get(REGEX_CONFKEY);
+ Preconditions.checkNotNull(regex, "regex is not configured");
+
+ List<ColumnInfo> columnInfoList = buildColumnInfoList(conf);
+
+ String arraySeparator = conf.get(ARRAY_DELIMITER_CONFKEY, ARRAY_DELIMITER_DEFAULT);
+
+ lineParser = new RegexLineParser(regex, columnInfoList, arraySeparator);
+
+ return new RegexUpsertExecutor(conn, tableName, columnInfoList, upsertListener);
+ }
+
+ /**
+ * Parses a single input line with regex, returning a {@link Map} objects.
+ */
+ @VisibleForTesting
+ static class RegexLineParser implements LineParser<Map<?, ?>> {
+ private Pattern inputPattern;
+ private List<ColumnInfo> columnInfoList;
+ private String arraySeparator;
+
+ public RegexLineParser(String regex, List<ColumnInfo> columnInfo, String arraySep) {
+ inputPattern = Pattern.compile(regex);
+ columnInfoList = columnInfo;
+ arraySeparator = arraySep;
+ }
+
+ /**
+ * based on the regex and input, providing mapping between schema and input
+ */
+ @Override
+ public Map<?, ?> parse(String input) throws IOException {
+ Map<String, Object> data = new HashMap<>();
+ Matcher m = inputPattern.matcher(input);
+ if (m.groupCount() != columnInfoList.size()) {
+ LOG.debug(String.format("based on the regex and input, input fileds %s size doesn't match the table columns %s size", m.groupCount(), columnInfoList.size()));
+ return data;
+ }
+
+ if (m.find( )) {
+ for (int i = 0; i < columnInfoList.size(); i++) {
+ ColumnInfo columnInfo = columnInfoList.get(i);
+ String colName = columnInfo.getColumnName();
+ String value = m.group(i + 1);
+ PDataType pDataType = PDataType.fromTypeId(columnInfo.getSqlType());
+ if (pDataType.isArrayType()) {
+ data.put(colName, Arrays.asList(value.split(arraySeparator)));
+ } else if (pDataType.isCoercibleTo(PTimestamp.INSTANCE)) {
+ data.put(colName, value);
+ } else {
+ data.put(colName, pDataType.toObject(value));
+ }
+ }
+ }
+ return data;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d18da38a/phoenix-core/src/main/java/org/apache/phoenix/util/regex/RegexUpsertExecutor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/regex/RegexUpsertExecutor.java b/phoenix-core/src/main/java/org/apache/phoenix/util/regex/RegexUpsertExecutor.java
new file mode 100644
index 0000000..0388d9c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/regex/RegexUpsertExecutor.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util.regex;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.UpsertExecutor;
+import org.apache.phoenix.util.json.JsonUpsertExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/** {@link UpsertExecutor} over {@link Map} objects, convert input record into {@link Map} objects by using regex. */
+public class RegexUpsertExecutor extends JsonUpsertExecutor {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(RegexUpsertExecutor.class);
+
+ /** Testing constructor. Do not use in prod. */
+ @VisibleForTesting
+ protected RegexUpsertExecutor(Connection conn, List<ColumnInfo> columnInfoList,
+ PreparedStatement stmt, UpsertListener<Map<?, ?>> upsertListener) {
+ super(conn, columnInfoList, stmt, upsertListener);
+ }
+
+ public RegexUpsertExecutor(Connection conn, String tableName, List<ColumnInfo> columnInfoList,
+ UpsertExecutor.UpsertListener<Map<?, ?>> upsertListener) {
+ super(conn, tableName, columnInfoList, upsertListener);
+ }
+
+ @Override
+ protected void execute(Map<?, ?> record) {
+ int fieldIndex = 0;
+ String colName = null;
+ try {
+ if (record.size() < conversionFunctions.size()) {
+ String message = String.format("Input record does not have enough values based on regex (has %d, but needs %d)",
+ record.size(), conversionFunctions.size());
+ throw new IllegalArgumentException(message);
+ }
+ for (fieldIndex = 0; fieldIndex < conversionFunctions.size(); fieldIndex++) {
+ colName = columnInfos.get(fieldIndex).getColumnName();
+ Object sqlValue = conversionFunctions.get(fieldIndex).apply(record.get(colName));
+ if (sqlValue != null) {
+ preparedStatement.setObject(fieldIndex + 1, sqlValue);
+ } else {
+ preparedStatement.setNull(fieldIndex + 1, dataTypes.get(fieldIndex).getSqlType());
+ }
+ }
+ preparedStatement.execute();
+ upsertListener.upsertDone(++upsertCount);
+ } catch (Exception e) {
+ if (LOG.isDebugEnabled()) {
+ // Even though this is an error we only log it with debug logging because we're notifying the
+ // listener, and it can do its own logging if needed
+ LOG.debug("Error on record " + record + ", fieldIndex " + fieldIndex + ", colName " + colName, e);
+ }
+ upsertListener.errorOnRecord(record, new Exception("fieldIndex: " + fieldIndex + ", colName " + colName, e));
+ }
+ }
+}
\ No newline at end of file