You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by gr...@apache.org on 2017/02/20 07:55:14 UTC

phoenix git commit: PHOENIX-3538 Regex bulk loader

Repository: phoenix
Updated Branches:
  refs/heads/master b5cf5aa2c -> d18da38af


PHOENIX-3538 Regex bulk loader

Add bulk loader which parses input based on a regular expression.

Contributed by kalyanhadooptraining@gmail.com


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d18da38a
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d18da38a
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d18da38a

Branch: refs/heads/master
Commit: d18da38afa0d7bbc0221f6472bc3b037edc6e3d4
Parents: b5cf5aa
Author: Gabriel Reid <ga...@ngdata.com>
Authored: Sun Feb 19 20:28:14 2017 +0100
Committer: Gabriel Reid <ga...@ngdata.com>
Committed: Mon Feb 20 08:17:57 2017 +0100

----------------------------------------------------------------------
 .../phoenix/end2end/RegexBulkLoadToolIT.java    | 371 +++++++++++++++++++
 .../phoenix/mapreduce/RegexBulkLoadTool.java    |  74 ++++
 .../mapreduce/RegexToKeyValueMapper.java        | 135 +++++++
 .../phoenix/util/regex/RegexUpsertExecutor.java |  80 ++++
 4 files changed, 660 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/d18da38a/phoenix-core/src/it/java/org/apache/phoenix/end2end/RegexBulkLoadToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/RegexBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/RegexBulkLoadToolIT.java
new file mode 100644
index 0000000..47b0db7
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/RegexBulkLoadToolIT.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.PrintWriter;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.phoenix.mapreduce.RegexBulkLoadTool;
+import org.apache.phoenix.util.DateUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class RegexBulkLoadToolIT extends BaseOwnClusterIT {
+
+    private static Connection conn;
+    private static String zkQuorum;
+
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        setUpTestDriver(ReadOnlyProps.EMPTY_PROPS);
+        zkQuorum = TestUtil.LOCALHOST + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + getUtility().getZkCluster().getClientPort();
+        conn = DriverManager.getConnection(getUrl());
+    }
+
+    @Test
+    public void testBasicImport() throws Exception {
+
+        Statement stmt = conn.createStatement();
+        stmt.execute("CREATE TABLE S.TABLE1 (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, T DATE) SPLIT ON (1,2)");
+
+        FileSystem fs = FileSystem.get(getUtility().getConfiguration());
+        FSDataOutputStream outputStream = fs.create(new Path("/tmp/input1.csv"));
+        PrintWriter printWriter = new PrintWriter(outputStream);
+        printWriter.println("1,Name 1,1970/01/01");
+        printWriter.println("2,Name 2,1970/01/02");
+        printWriter.close();
+
+        RegexBulkLoadTool regexBulkLoadTool = new RegexBulkLoadTool();
+        regexBulkLoadTool.setConf(getUtility().getConfiguration());
+        regexBulkLoadTool.getConf().set(DATE_FORMAT_ATTRIB,"yyyy/MM/dd");
+        int exitCode = regexBulkLoadTool.run(new String[] {
+                "--input", "/tmp/input1.csv",
+                "--table", "table1",
+                "--schema", "s",
+                "--regex", "([^,]*),([^,]*),([^,]*)",
+                "--zookeeper", zkQuorum});
+        assertEquals(0, exitCode);
+
+        ResultSet rs = stmt.executeQuery("SELECT id, name, t FROM s.table1 ORDER BY id");
+        assertTrue(rs.next());
+        assertEquals(1, rs.getInt(1));
+        assertEquals("Name 1", rs.getString(2));
+        assertEquals(DateUtil.parseDate("1970-01-01"), rs.getDate(3));
+        assertTrue(rs.next());
+        assertEquals(2, rs.getInt(1));
+        assertEquals("Name 2", rs.getString(2));
+        assertEquals(DateUtil.parseDate("1970-01-02"), rs.getDate(3));
+        assertFalse(rs.next());
+
+        rs.close();
+        stmt.close();
+    }
+
+    @Test
+    public void testFullOptionImport() throws Exception {
+
+        Statement stmt = conn.createStatement();
+        stmt.execute("CREATE TABLE TABLE2 (ID INTEGER NOT NULL PRIMARY KEY, " +
+                "NAME VARCHAR, NAMES VARCHAR ARRAY, FLAG BOOLEAN)");
+
+        FileSystem fs = FileSystem.get(getUtility().getConfiguration());
+        FSDataOutputStream outputStream = fs.create(new Path("/tmp/input2.csv"));
+        PrintWriter printWriter = new PrintWriter(outputStream);
+        printWriter.println("1|Name 1a;Name 1b,true");
+        printWriter.println("2|Name 2a;Name 2b,false");
+        printWriter.close();
+
+        RegexBulkLoadTool regexBulkLoadTool = new RegexBulkLoadTool();
+        regexBulkLoadTool.setConf(getUtility().getConfiguration());
+        int exitCode = regexBulkLoadTool.run(new String[] {
+                "--input", "/tmp/input2.csv",
+                "--table", "table2",
+                "--zookeeper", zkQuorum,
+                "--array-delimiter", ";",
+                "--regex", "([^|]*)\\|([^,]*),([^,]*)",
+                "--import-columns", "ID,NAMES,FLAG"});
+        assertEquals(0, exitCode);
+
+        ResultSet rs = stmt.executeQuery("SELECT id, names FROM table2 ORDER BY id");
+        assertTrue(rs.next());
+        assertEquals(1, rs.getInt(1));
+        assertArrayEquals(new Object[] { "Name 1a", "Name 1b" }, (Object[]) rs.getArray(2).getArray());
+        assertTrue(rs.next());
+        assertEquals(2, rs.getInt(1));
+        assertArrayEquals(new Object[] { "Name 2a", "Name 2b" }, (Object[]) rs.getArray(2).getArray());
+        assertFalse(rs.next());
+
+        rs.close();
+        stmt.close();
+    }
+
+    @Test
+    public void testMultipleInputFiles() throws Exception {
+
+        Statement stmt = conn.createStatement();
+        stmt.execute("CREATE TABLE TABLE7 (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, T DATE) SPLIT ON (1,2)");
+
+        FileSystem fs = FileSystem.get(getUtility().getConfiguration());
+        FSDataOutputStream outputStream = fs.create(new Path("/tmp/input1.csv"));
+        PrintWriter printWriter = new PrintWriter(outputStream);
+        printWriter.println("1,Name 1,1970/01/01");
+        printWriter.close();
+        outputStream = fs.create(new Path("/tmp/input2.csv"));
+        printWriter = new PrintWriter(outputStream);
+        printWriter.println("2,Name 2,1970/01/02");
+        printWriter.close();
+
+        RegexBulkLoadTool regexBulkLoadTool = new RegexBulkLoadTool();
+        regexBulkLoadTool.setConf(getUtility().getConfiguration());
+        regexBulkLoadTool.getConf().set(DATE_FORMAT_ATTRIB,"yyyy/MM/dd");
+        int exitCode = regexBulkLoadTool.run(new String[] {
+            "--input", "/tmp/input1.csv,/tmp/input2.csv",
+            "--table", "table7",
+            "--regex", "([^,]*),([^,]*),([^,]*)",
+            "--zookeeper", zkQuorum});
+        assertEquals(0, exitCode);
+
+        ResultSet rs = stmt.executeQuery("SELECT id, name, t FROM table7 ORDER BY id");
+        assertTrue(rs.next());
+        assertEquals(1, rs.getInt(1));
+        assertEquals("Name 1", rs.getString(2));
+        assertEquals(DateUtil.parseDate("1970-01-01"), rs.getDate(3));
+        assertTrue(rs.next());
+        assertEquals(2, rs.getInt(1));
+        assertEquals("Name 2", rs.getString(2));
+        assertEquals(DateUtil.parseDate("1970-01-02"), rs.getDate(3));
+        assertFalse(rs.next());
+
+        rs.close();
+        stmt.close();
+    }
+
+    @Test
+    public void testImportWithIndex() throws Exception {
+
+        Statement stmt = conn.createStatement();
+        stmt.execute("CREATE TABLE TABLE3 (ID INTEGER NOT NULL PRIMARY KEY, " +
+            "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)");
+        String ddl = "CREATE INDEX TABLE3_IDX ON TABLE3 "
+                + " (FIRST_NAME ASC)"
+                + " INCLUDE (LAST_NAME)";
+        stmt.execute(ddl);
+        
+        FileSystem fs = FileSystem.get(getUtility().getConfiguration());
+        FSDataOutputStream outputStream = fs.create(new Path("/tmp/input3.csv"));
+        PrintWriter printWriter = new PrintWriter(outputStream);
+        printWriter.println("1,FirstName 1,LastName 1");
+        printWriter.println("2,FirstName 2,LastName 2");
+        printWriter.close();
+
+        RegexBulkLoadTool regexBulkLoadTool = new RegexBulkLoadTool();
+        regexBulkLoadTool.setConf(getUtility().getConfiguration());
+        int exitCode = regexBulkLoadTool.run(new String[] {
+                "--input", "/tmp/input3.csv",
+                "--table", "table3",
+                "--regex", "([^,]*),([^,]*),([^,]*)",
+                "--zookeeper", zkQuorum});
+        assertEquals(0, exitCode);
+
+        ResultSet rs = stmt.executeQuery("SELECT id, FIRST_NAME FROM TABLE3 where first_name='FirstName 2'");
+        assertTrue(rs.next());
+        assertEquals(2, rs.getInt(1));
+        assertEquals("FirstName 2", rs.getString(2));
+
+        rs.close();
+        stmt.close();
+    }
+
+    @Test
+    public void testImportWithLocalIndex() throws Exception {
+
+        Statement stmt = conn.createStatement();
+        stmt.execute("CREATE TABLE TABLE6 (ID INTEGER NOT NULL PRIMARY KEY, " +
+                "FIRST_NAME VARCHAR, LAST_NAME VARCHAR) SPLIt ON (1,2)");
+        String ddl = "CREATE LOCAL INDEX TABLE6_IDX ON TABLE6 "
+                + " (FIRST_NAME ASC)";
+        stmt.execute(ddl);
+        ddl = "CREATE LOCAL INDEX TABLE6_IDX2 ON TABLE6 " + " (LAST_NAME ASC)";
+        stmt.execute(ddl);
+
+        FileSystem fs = FileSystem.get(getUtility().getConfiguration());
+        FSDataOutputStream outputStream = fs.create(new Path("/tmp/input3.csv"));
+        PrintWriter printWriter = new PrintWriter(outputStream);
+        printWriter.println("1,FirstName 1:LastName 1");
+        printWriter.println("2,FirstName 2:LastName 2");
+        printWriter.close();
+
+        RegexBulkLoadTool regexBulkLoadTool = new RegexBulkLoadTool();
+        regexBulkLoadTool.setConf(getUtility().getConfiguration());
+        int exitCode = regexBulkLoadTool.run(new String[] {
+                "--input", "/tmp/input3.csv",
+                "--table", "table6",
+                "--regex", "([^,]*),([^:]*):([^,]*)",
+                "--zookeeper", zkQuorum});
+        assertEquals(0, exitCode);
+
+        ResultSet rs = stmt.executeQuery("SELECT id, FIRST_NAME FROM TABLE6 where first_name='FirstName 2'");
+        assertTrue(rs.next());
+        assertEquals(2, rs.getInt(1));
+        assertEquals("FirstName 2", rs.getString(2));
+
+        rs.close();
+        stmt.close();
+    }
+
+    @Test
+    public void testImportOneIndexTable() throws Exception {
+        testImportOneIndexTable("TABLE4", false);
+    }
+
+    @Test
+    public void testImportOneLocalIndexTable() throws Exception {
+        testImportOneIndexTable("TABLE5", true);
+    }
+
+    public void testImportOneIndexTable(String tableName, boolean localIndex) throws Exception {
+
+        String indexTableName = String.format("%s_IDX", tableName);
+        Statement stmt = conn.createStatement();
+        stmt.execute("CREATE TABLE " + tableName + "(ID INTEGER NOT NULL PRIMARY KEY, "
+                + "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)");
+        String ddl =
+                "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexTableName + " ON "
+                        + tableName + "(FIRST_NAME ASC)";
+        stmt.execute(ddl);
+
+        FileSystem fs = FileSystem.get(getUtility().getConfiguration());
+        FSDataOutputStream outputStream = fs.create(new Path("/tmp/input4.csv"));
+        PrintWriter printWriter = new PrintWriter(outputStream);
+        printWriter.println("1,FirstName 1,LastName 1");
+        printWriter.println("2,FirstName 2,LastName 2");
+        printWriter.close();
+
+        RegexBulkLoadTool regexBulkLoadTool = new RegexBulkLoadTool();
+        regexBulkLoadTool.setConf(getUtility().getConfiguration());
+        int exitCode = regexBulkLoadTool.run(new String[] {
+                "--input", "/tmp/input4.csv",
+                "--table", tableName,
+                "--regex", "([^,]*),([^,]*),([^,]*)",
+                "--index-table", indexTableName,
+                "--zookeeper", zkQuorum });
+        assertEquals(0, exitCode);
+
+        ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName);
+        assertFalse(rs.next());
+        rs = stmt.executeQuery("SELECT FIRST_NAME FROM " + tableName + " where FIRST_NAME='FirstName 1'");
+        assertTrue(rs.next());
+        assertEquals("FirstName 1", rs.getString(1));
+
+        rs.close();
+        stmt.close();
+    }
+    
+    @Test
+    public void testInvalidArguments() {
+        String tableName = "TABLE8";
+        RegexBulkLoadTool regexBulkLoadTool = new RegexBulkLoadTool();
+        regexBulkLoadTool.setConf(getUtility().getConfiguration());
+        try {
+            regexBulkLoadTool.run(new String[] {
+                "--input", "/tmp/input4.csv",
+                "--table", tableName,
+                "--regex", "([^,]*),([^,]*),([^,]*)",
+                "--zookeeper", zkQuorum });
+            fail(String.format("Table %s not created, hence should fail",tableName));
+        } catch (Exception ex) {
+            assertTrue(ex instanceof IllegalArgumentException); 
+            assertTrue(ex.getMessage().contains(String.format("Table %s not found", tableName)));
+        }
+    }
+    
+    @Test
+    public void testAlreadyExistsOutputPath() {
+        String tableName = "TABLE9";
+        String outputPath = "/tmp/output/tabl9";
+        try {
+            Statement stmt = conn.createStatement();
+            stmt.execute("CREATE TABLE " + tableName + "(ID INTEGER NOT NULL PRIMARY KEY, "
+                    + "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)");
+            
+            FileSystem fs = FileSystem.get(getUtility().getConfiguration());
+            fs.create(new Path(outputPath));
+            FSDataOutputStream outputStream = fs.create(new Path("/tmp/input9.csv"));
+            PrintWriter printWriter = new PrintWriter(outputStream);
+            printWriter.println("1,FirstName 1,LastName 1");
+            printWriter.println("2,FirstName 2,LastName 2");
+            printWriter.close();
+            
+            RegexBulkLoadTool regexBulkLoadTool = new RegexBulkLoadTool();
+            regexBulkLoadTool.setConf(getUtility().getConfiguration());
+            regexBulkLoadTool.run(new String[] {
+                "--input", "/tmp/input9.csv",
+                "--output", outputPath,
+                "--table", tableName,
+                "--regex", "([^,]*),([^,]*),([^,]*)",
+                "--zookeeper", zkQuorum });
+            
+            fail(String.format("Output path %s already exists. hence, should fail",outputPath));
+        } catch (Exception ex) {
+            assertTrue(ex instanceof FileAlreadyExistsException); 
+        }
+    }
+    
+    @Test
+    public void testInvalidRegex() throws Exception {
+        Statement stmt = conn.createStatement();
+        stmt.execute("CREATE TABLE TABLE10 (ID INTEGER NOT NULL PRIMARY KEY, " +
+                "NAME VARCHAR, NAMES VARCHAR ARRAY, FLAG BOOLEAN)");
+
+        FileSystem fs = FileSystem.get(getUtility().getConfiguration());
+        FSDataOutputStream outputStream = fs.create(new Path("/tmp/input10.csv"));
+        PrintWriter printWriter = new PrintWriter(outputStream);
+        printWriter.println("1|Name 1a;Name 1b,true");
+        printWriter.println("2|Name 2a;Name 2b");
+        printWriter.close();
+
+        RegexBulkLoadTool regexBulkLoadTool = new RegexBulkLoadTool();
+        regexBulkLoadTool.setConf(getUtility().getConfiguration());
+        int exitCode = regexBulkLoadTool.run(new String[] {
+                "--input", "/tmp/input10.csv",
+                "--table", "table10",
+                "--zookeeper", zkQuorum,
+                "--array-delimiter", ";",
+                "--regex", "([^|]*)\\|([^,]*),([^,]*)",
+                "--import-columns", "ID,NAMES,FLAG"});
+        assertEquals(-1, exitCode);
+        stmt.close();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d18da38a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/RegexBulkLoadTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/RegexBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/RegexBulkLoadTool.java
new file mode 100644
index 0000000..94544c9
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/RegexBulkLoadTool.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.util.ColumnInfo;
+
+/**
+ * A tool for running MapReduce-based ingests of input data based on regex.
+ * Lists are converted into typed ARRAYS.
+ */
+public class RegexBulkLoadTool extends AbstractBulkLoadTool {
+
+    static final Option REGEX_OPT = new Option("r", "regex", true, "Input regex String, defaults is (.*)");
+    static final Option ARRAY_DELIMITER_OPT = new Option("a", "array-delimiter", true, "Array element delimiter (optional), defaults is ','");
+
+    @Override
+    protected Options getOptions() {
+        Options options = super.getOptions();
+        options.addOption(REGEX_OPT);
+        options.addOption(ARRAY_DELIMITER_OPT);
+        return options;
+    }
+
+    @Override
+    protected void configureOptions(CommandLine cmdLine, List<ColumnInfo> importColumns,
+                                         Configuration conf) throws SQLException {
+    	if (cmdLine.hasOption(REGEX_OPT.getOpt())) {
+            String regexString = cmdLine.getOptionValue(REGEX_OPT.getOpt());
+            conf.set(RegexToKeyValueMapper.REGEX_CONFKEY, regexString);
+        }
+    	
+    	if (cmdLine.hasOption(ARRAY_DELIMITER_OPT.getOpt())) {
+            String arraySeparator = cmdLine.getOptionValue(ARRAY_DELIMITER_OPT.getOpt());
+            conf.set(RegexToKeyValueMapper.ARRAY_DELIMITER_CONFKEY, arraySeparator);
+        }
+    }
+
+    @Override
+    protected void setupJob(Job job) {
+        // Allow overriding the job jar setting by using a -D system property at startup
+        if (job.getJar() == null) {
+            job.setJarByClass(RegexToKeyValueMapper.class);
+        }
+        job.setMapperClass(RegexToKeyValueMapper.class);
+    }
+
+    public static void main(String[] args) throws Exception {
+        ToolRunner.run(new RegexBulkLoadTool(), args);
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d18da38a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/RegexToKeyValueMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/RegexToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/RegexToKeyValueMapper.java
new file mode 100644
index 0000000..f63923d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/RegexToKeyValueMapper.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PTimestamp;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.UpsertExecutor;
+import org.apache.phoenix.util.regex.RegexUpsertExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+/**
+ * MapReduce mapper that converts input lines into KeyValues based on the Regex that can be written to HFiles.
+ * <p/>
+ * KeyValues are produced by executing UPSERT statements on a Phoenix connection and then
+ * extracting the created KeyValues and rolling back the statement execution before it is
+ * committed to HBase.
+ */
+public class RegexToKeyValueMapper extends FormatToBytesWritableMapper<Map<?, ?>> {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(RegexToKeyValueMapper.class);
+
+    /** Configuration key for the regex */
+    public static final String REGEX_CONFKEY = "phoenix.mapreduce.import.regex";
+
+    /** Configuration key for the array element delimiter for input arrays */
+    public static final String ARRAY_DELIMITER_CONFKEY = "phoenix.mapreduce.import.arraydelimiter";
+    
+    /** Configuration key for default array delimiter */
+    public static final String ARRAY_DELIMITER_DEFAULT = ",";
+    
+    private LineParser<Map<?, ?>> lineParser;
+    
+    @Override
+    protected  LineParser<Map<?, ?>> getLineParser() {
+        return lineParser;
+    }
+
+    @Override
+    protected void setup(Context context) throws IOException, InterruptedException {
+        super.setup(context);
+    }
+
+    @VisibleForTesting
+    @Override
+    protected UpsertExecutor<Map<?, ?>, ?> buildUpsertExecutor(Configuration conf) {
+        String tableName = conf.get(TABLE_NAME_CONFKEY);
+        Preconditions.checkNotNull(tableName, "table name is not configured");
+        
+        String regex = conf.get(REGEX_CONFKEY);
+        Preconditions.checkNotNull(regex, "regex is not configured");
+        
+        List<ColumnInfo> columnInfoList = buildColumnInfoList(conf);
+        
+        String arraySeparator = conf.get(ARRAY_DELIMITER_CONFKEY, ARRAY_DELIMITER_DEFAULT);
+        
+        lineParser = new RegexLineParser(regex, columnInfoList, arraySeparator);
+
+        return new RegexUpsertExecutor(conn, tableName, columnInfoList, upsertListener);
+    }
+
+    /**
+     * Parses a single input line with regex, returning a {@link Map} objects.
+     */
+    @VisibleForTesting
+    static class RegexLineParser implements LineParser<Map<?, ?>> {
+        private Pattern inputPattern;
+        private List<ColumnInfo> columnInfoList;
+        private String arraySeparator;
+        
+        public RegexLineParser(String regex, List<ColumnInfo> columnInfo, String arraySep) {
+        	inputPattern = Pattern.compile(regex);
+        	columnInfoList = columnInfo;
+        	arraySeparator = arraySep;
+		}
+
+        /**
+         * based on the regex and input, providing mapping between schema and input
+         */
+		@Override
+        public Map<?, ?> parse(String input) throws IOException {
+			Map<String, Object> data = new HashMap<>();
+			Matcher m = inputPattern.matcher(input);
+			if (m.groupCount() != columnInfoList.size()) {
+				LOG.debug(String.format("based on the regex and input, input fileds %s size doesn't match the table columns %s size", m.groupCount(), columnInfoList.size()));
+				return data;
+			}
+			
+			if (m.find( )) {
+				for (int i = 0; i < columnInfoList.size(); i++) {
+					ColumnInfo columnInfo = columnInfoList.get(i);
+					String colName = columnInfo.getColumnName();
+					String value = m.group(i + 1);
+					PDataType pDataType = PDataType.fromTypeId(columnInfo.getSqlType());
+					if (pDataType.isArrayType()) {
+						data.put(colName, Arrays.asList(value.split(arraySeparator)));
+					} else if (pDataType.isCoercibleTo(PTimestamp.INSTANCE)) {
+						data.put(colName, value);
+					} else {
+						data.put(colName, pDataType.toObject(value));
+					}
+				}
+			}
+			return data;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/d18da38a/phoenix-core/src/main/java/org/apache/phoenix/util/regex/RegexUpsertExecutor.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/regex/RegexUpsertExecutor.java b/phoenix-core/src/main/java/org/apache/phoenix/util/regex/RegexUpsertExecutor.java
new file mode 100644
index 0000000..0388d9c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/regex/RegexUpsertExecutor.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.util.regex;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.UpsertExecutor;
+import org.apache.phoenix.util.json.JsonUpsertExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/** {@link UpsertExecutor} over {@link Map} objects, convert input record into {@link Map} objects by using regex. */
+public class RegexUpsertExecutor extends JsonUpsertExecutor {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(RegexUpsertExecutor.class);
+
+    /** Testing constructor. Do not use in prod. */
+    @VisibleForTesting
+    protected RegexUpsertExecutor(Connection conn, List<ColumnInfo> columnInfoList,
+            PreparedStatement stmt, UpsertListener<Map<?, ?>> upsertListener) {
+        super(conn, columnInfoList, stmt, upsertListener);
+    }
+
+    public RegexUpsertExecutor(Connection conn, String tableName, List<ColumnInfo> columnInfoList,
+            UpsertExecutor.UpsertListener<Map<?, ?>> upsertListener) {
+        super(conn, tableName, columnInfoList, upsertListener);
+    }
+
+    @Override
+    protected void execute(Map<?, ?> record) {
+        int fieldIndex = 0;
+        String colName = null;
+        try {
+            if (record.size() < conversionFunctions.size()) {
+                String message = String.format("Input record does not have enough values based on regex (has %d, but needs %d)",
+                        record.size(), conversionFunctions.size());
+                throw new IllegalArgumentException(message);
+            }
+            for (fieldIndex = 0; fieldIndex < conversionFunctions.size(); fieldIndex++) {
+                colName = columnInfos.get(fieldIndex).getColumnName();
+                Object sqlValue = conversionFunctions.get(fieldIndex).apply(record.get(colName));
+                if (sqlValue != null) {
+                    preparedStatement.setObject(fieldIndex + 1, sqlValue);
+                } else {
+                    preparedStatement.setNull(fieldIndex + 1, dataTypes.get(fieldIndex).getSqlType());
+                }
+            }
+            preparedStatement.execute();
+            upsertListener.upsertDone(++upsertCount);
+        } catch (Exception e) {
+            if (LOG.isDebugEnabled()) {
+                // Even though this is an error we only log it with debug logging because we're notifying the
+                // listener, and it can do its own logging if needed
+                LOG.debug("Error on record " + record + ", fieldIndex " + fieldIndex + ", colName " + colName, e);
+            }
+            upsertListener.errorOnRecord(record, new Exception("fieldIndex: " + fieldIndex + ", colName " + colName, e));
+        }
+    }
+}
\ No newline at end of file