You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ra...@apache.org on 2015/03/12 04:55:03 UTC

[1/2] phoenix git commit: fix for unescaping the columns

Repository: phoenix
Updated Branches:
  refs/heads/master 80a50c964 -> 2bbbcfdbe


fix for unescaping the columns


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2bbbcfdb
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2bbbcfdb
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2bbbcfdb

Branch: refs/heads/master
Commit: 2bbbcfdbe873e12e3d89a8187f9ad7837e59eddd
Parents: 55f64ea
Author: ravimagham <ra...@apache.org>
Authored: Wed Mar 11 12:48:35 2015 -0700
Committer: ravimagham <ra...@apache.org>
Committed: Wed Mar 11 20:41:36 2015 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/phoenix/util/ColumnInfo.java    |  7 ++++---
 .../main/java/org/apache/phoenix/util/SchemaUtil.java    | 11 +++++++++++
 .../phoenix/pig/util/PhoenixPigSchemaUtilTest.java       |  4 ++--
 3 files changed, 17 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bbbcfdb/phoenix-core/src/main/java/org/apache/phoenix/util/ColumnInfo.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ColumnInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ColumnInfo.java
index 46350be..3f94b92 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ColumnInfo.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ColumnInfo.java
@@ -56,11 +56,12 @@ public class ColumnInfo {
      * @return
      */
     public String getDisplayName() {
-        int index = columnName.indexOf(QueryConstants.NAME_SEPARATOR);
+    	final String unescapedColumnName = SchemaUtil.getUnEscapedFullColumnName(columnName);
+        int index = unescapedColumnName.indexOf(QueryConstants.NAME_SEPARATOR);
         if (index < 0) {
-            return columnName; 
+            return unescapedColumnName; 
         }
-        return columnName.substring(index+1);
+        return unescapedColumnName.substring(index+1).trim();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bbbcfdb/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index 1d986c6..2a1d3ff 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -681,4 +681,15 @@ public class SchemaUtil {
         checkArgument(!isNullOrEmpty(columnName), "Column name cannot be null or empty");
         return columnFamilyName == null ? ("\"" + columnName + "\"") : ("\"" + columnFamilyName + "\"" + QueryConstants.NAME_SEPARATOR + "\"" + columnName + "\"");
     }
+    
+    /**
+     * Replaces all occurrences of {@link #ESCAPE_CHARACTER} with an empty character. 
+     * @param fullColumnName
+     * @return 
+     */
+    public static String getUnEscapedFullColumnName(String fullColumnName) {
+        checkArgument(!isNullOrEmpty(fullColumnName), "Column name cannot be null or empty");
+        fullColumnName = fullColumnName.replaceAll(ESCAPE_CHARACTER, "");
+       	return fullColumnName.trim();
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2bbbcfdb/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
index abfb442..24d27b1 100644
--- a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
+++ b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
@@ -66,10 +66,10 @@ public class PhoenixPigSchemaUtilTest {
         
         // expected schema.
         final ResourceFieldSchema[] fields = new ResourceFieldSchema[2];
-        fields[0] = new ResourceFieldSchema().setName(SchemaUtil.getEscapedFullColumnName("ID"))
+        fields[0] = new ResourceFieldSchema().setName("ID")
                                                 .setType(DataType.LONG);
 
-        fields[1] = new ResourceFieldSchema().setName(SchemaUtil.getEscapedFullColumnName("NAME"))
+        fields[1] = new ResourceFieldSchema().setName("NAME")
                                                 .setType(DataType.CHARARRAY);
         final ResourceSchema expected = new ResourceSchema().setFields(fields);
         


[2/2] phoenix git commit: PHOENIX-1609-4.0

Posted by ra...@apache.org.
PHOENIX-1609-4.0


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/55f64eaf
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/55f64eaf
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/55f64eaf

Branch: refs/heads/master
Commit: 55f64eaf3d48bb788ec3f6d4e9d50b1d2baf5b9f
Parents: 80a50c9
Author: ravimagham <ra...@apache.org>
Authored: Thu Mar 5 16:18:57 2015 -0800
Committer: ravimagham <ra...@apache.org>
Committed: Wed Mar 11 20:41:36 2015 -0700

----------------------------------------------------------------------
 .../apache/phoenix/mapreduce/IndexToolIT.java   | 296 ++++++++++++++++++
 phoenix-core/src/main/antlr3/PhoenixSQL.g       |   7 +-
 .../phoenix/compile/PostIndexDDLCompiler.java   |  36 ++-
 .../apache/phoenix/jdbc/PhoenixStatement.java   |   8 +-
 .../phoenix/mapreduce/CsvBulkImportUtil.java    |   6 +-
 .../phoenix/mapreduce/CsvToKeyValueMapper.java  |  42 +--
 .../phoenix/mapreduce/PhoenixInputFormat.java   |   9 +-
 .../phoenix/mapreduce/PhoenixJobCounters.java   |  29 ++
 .../phoenix/mapreduce/index/IndexTool.java      | 302 +++++++++++++++++++
 .../mapreduce/index/PhoenixIndexDBWritable.java |  91 ++++++
 .../index/PhoenixIndexImportMapper.java         | 133 ++++++++
 .../phoenix/mapreduce/util/ConnectionUtil.java  |  14 +
 .../util/PhoenixConfigurationUtil.java          |  21 ++
 .../phoenix/parse/CreateIndexStatement.java     |   8 +-
 .../apache/phoenix/parse/ParseNodeFactory.java  |   4 +-
 .../apache/phoenix/schema/MetaDataClient.java   |   5 +
 .../org/apache/phoenix/util/ColumnInfo.java     |  13 +-
 .../java/org/apache/phoenix/util/QueryUtil.java |  46 ++-
 .../org/apache/phoenix/util/SchemaUtil.java     |   3 +
 .../mapreduce/CsvBulkImportUtilTest.java        |  14 +-
 .../mapreduce/CsvToKeyValueMapperTest.java      |  26 +-
 .../org/apache/phoenix/util/ColumnInfoTest.java |   8 +-
 .../org/apache/phoenix/util/QueryUtilTest.java  |   2 +-
 .../pig/util/PhoenixPigSchemaUtilTest.java      |   5 +-
 24 files changed, 1048 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java
new file mode 100644
index 0000000..2b7b16b
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/mapreduce/IndexToolIT.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Tests for the {@link IndexTool}
+ */
+@Category(NeedsOwnMiniClusterTest.class)
+public class IndexToolIT {
+    
+    private static HBaseTestingUtility hbaseTestUtil;
+    private static String zkQuorum;
+  
+    @BeforeClass
+    public static void setUp() throws Exception {
+        hbaseTestUtil = new HBaseTestingUtility();
+        Configuration conf = hbaseTestUtil.getConfiguration();
+        setUpConfigForMiniCluster(conf);
+        hbaseTestUtil.startMiniCluster();
+        hbaseTestUtil.startMiniMapReduceCluster();
+        Class.forName(PhoenixDriver.class.getName());
+        zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort();
+    }
+    
+    @Test
+    public void testImmutableGlobalIndex() throws Exception {
+        testSecondaryIndex("DATA_TABLE1",true, false);
+    }
+    
+    @Test
+    public void testImmutableLocalIndex() throws Exception {
+        testSecondaryIndex("DATA_TABLE2",true, true);
+    }
+    
+    @Test
+    public void testMutableGlobalIndex() throws Exception {
+        testSecondaryIndex("DATA_TABLE3",false, false);
+    }
+    
+    @Test
+    public void testMutableLocalIndex() throws Exception {
+        testSecondaryIndex("DATA_TABLE4",false, true);
+    }
+    
+    public void testSecondaryIndex(final String dataTable , final boolean isImmutable , final boolean isLocal) throws Exception {
+        
+        final String indxTable = String.format("%s_%s",dataTable,"INDX");
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum,props);
+        Statement stmt = conn.createStatement();
+        try {
+        
+            stmt.execute(String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) %s",dataTable, (isImmutable ? "IMMUTABLE_ROWS=true" :"")));
+            String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)",dataTable);
+            PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+            
+            int id = 1;
+            // insert two rows
+            upsertRow(stmt1, id++);
+            upsertRow(stmt1, id++);
+            conn.commit();
+            
+            stmt.execute(String.format("CREATE %s INDEX %s ON %s (UPPER(NAME)) ASYNC ", (isLocal ? "LOCAL" : ""), indxTable,dataTable));
+   
+            //verify rows are fetched from data table.
+            String selectSql = String.format("SELECT UPPER(NAME),ID FROM %s",dataTable);
+            ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
+            String actualExplainPlan = QueryUtil.getExplainPlan(rs);
+            
+            //assert we are pulling from data table.
+            assertEquals(String.format("CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER %s",dataTable),actualExplainPlan);
+            
+            rs = stmt1.executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals("UNAME1", rs.getString(1));    
+            assertTrue(rs.next());
+            assertEquals("UNAME2", rs.getString(1));
+           
+            //run the index MR job.
+            final IndexTool indexingTool = new IndexTool();
+            indexingTool.setConf(new Configuration(hbaseTestUtil.getConfiguration()));
+            
+            final String[] cmdArgs = getArgValues(dataTable,indxTable);
+            int status = indexingTool.run(cmdArgs);
+            assertEquals(0, status);
+            
+            // insert two more rows
+            upsertRow(stmt1, 3);
+            upsertRow(stmt1, 4);
+            conn.commit();
+
+            //assert we are pulling from index table.
+            rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
+            actualExplainPlan = QueryUtil.getExplainPlan(rs);
+            assertExplainPlan(actualExplainPlan,dataTable,indxTable,isLocal);
+            
+            rs = stmt.executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals("UNAME1", rs.getString(1));
+            assertEquals(1, rs.getInt(2));
+            
+            assertTrue(rs.next());
+            assertEquals("UNAME2", rs.getString(1));
+            assertEquals(2, rs.getInt(2));
+
+            assertTrue(rs.next());
+            assertEquals("UNAME3", rs.getString(1));
+            assertEquals(3, rs.getInt(2));
+            
+            assertTrue(rs.next());
+            assertEquals("UNAME4", rs.getString(1));
+            assertEquals(4, rs.getInt(2));
+      
+            assertFalse(rs.next());
+            
+            conn.createStatement().execute(String.format("DROP INDEX  %s ON %s",indxTable , dataTable));
+        } finally {
+            conn.close();
+        }
+    }
+    
+    
+    /**
+     * This test is to assert that updates that happen to rows of a mutable table after an index is created in ASYNC mode and before
+     * the MR job runs, do show up in the index table . 
+     * @throws Exception
+     */
+    @Test
+    public void testMutalbleIndexWithUpdates() throws Exception {
+        
+        final String dataTable = "DATA_TABLE5";
+        final String indxTable = String.format("%s_%s",dataTable,"INDX");
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum,props);
+        Statement stmt = conn.createStatement();
+        try {
+        
+            stmt.execute(String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER)",dataTable));
+            String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)",dataTable);
+            PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+            
+            int id = 1;
+            // insert two rows
+            upsertRow(stmt1, id++);
+            upsertRow(stmt1, id++);
+            conn.commit();
+            
+            stmt.execute(String.format("CREATE INDEX %s ON %s (UPPER(NAME)) ASYNC ", indxTable,dataTable));
+            
+            //update a row 
+            stmt1.setInt(1, 1);
+            stmt1.setString(2, "uname" + String.valueOf(10));
+            stmt1.setInt(3, 95050 + 1);
+            stmt1.executeUpdate();
+            conn.commit();  
+            
+            //verify rows are fetched from data table.
+            String selectSql = String.format("SELECT UPPER(NAME),ID FROM %s",dataTable);
+            ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
+            String actualExplainPlan = QueryUtil.getExplainPlan(rs);
+            
+            //assert we are pulling from data table.
+            assertEquals(String.format("CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER %s",dataTable),actualExplainPlan);
+            
+            rs = stmt1.executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals("UNAME10", rs.getString(1));
+            assertTrue(rs.next());
+            assertEquals("UNAME2", rs.getString(1));
+           
+            //run the index MR job.
+            final IndexTool indexingTool = new IndexTool();
+            indexingTool.setConf(new Configuration(hbaseTestUtil.getConfiguration()));
+            
+            final String[] cmdArgs = getArgValues(dataTable,indxTable);
+            int status = indexingTool.run(cmdArgs);
+            assertEquals(0, status);
+            
+            //assert we are pulling from index table.
+            rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
+            actualExplainPlan = QueryUtil.getExplainPlan(rs);
+            assertExplainPlan(actualExplainPlan,dataTable,indxTable,false);
+            
+            rs = stmt.executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals("UNAME10", rs.getString(1));
+            assertEquals(1, rs.getInt(2));
+            
+            assertTrue(rs.next());
+            assertEquals("UNAME2", rs.getString(1));
+            assertEquals(2, rs.getInt(2));
+            conn.createStatement().execute(String.format("DROP INDEX  %s ON %s",indxTable , dataTable));
+        } finally {
+            conn.close();
+        }
+    }
+    
+    private void assertExplainPlan(final String actualExplainPlan, String dataTable,
+            String indxTable, boolean isLocal) {
+        
+        String expectedExplainPlan = "";
+        if(isLocal) {
+            final String localIndexName = MetaDataUtil.getLocalIndexTableName(dataTable);
+            expectedExplainPlan = String.format("CLIENT 1-CHUNK PARALLEL 1-WAY RANGE SCAN OVER %s [-32768]"
+                + "\n    SERVER FILTER BY FIRST KEY ONLY"
+                + "\nCLIENT MERGE SORT", localIndexName);
+        } else {
+            expectedExplainPlan = String.format("CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER %s"
+                    + "\n    SERVER FILTER BY FIRST KEY ONLY",indxTable);
+        }
+        assertEquals(expectedExplainPlan,actualExplainPlan);
+    }
+
+    private String[] getArgValues(String dataTable, String indxTable) {
+        final List<String> args = Lists.newArrayList();
+        args.add("-dt");
+        args.add(dataTable);
+        args.add("-it");
+        args.add(indxTable);
+        args.add("-op");
+        args.add("/tmp/"+UUID.randomUUID().toString());
+        return args.toArray(new String[0]);
+    }
+
+    private void upsertRow(PreparedStatement stmt, int i) throws SQLException {
+        // insert row
+        stmt.setInt(1, i);
+        stmt.setString(2, "uname" + String.valueOf(i));
+        stmt.setInt(3, 95050 + i);
+        stmt.executeUpdate();
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+        try {
+            PhoenixDriver.INSTANCE.close();
+        } finally {
+            try {
+                DriverManager.deregisterDriver(PhoenixDriver.INSTANCE);
+            } finally {                    
+                try {
+                    hbaseTestUtil.shutdownMiniMapReduceCluster();
+                } finally {
+                    hbaseTestUtil.shutdownMiniCluster();
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/main/antlr3/PhoenixSQL.g
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/antlr3/PhoenixSQL.g b/phoenix-core/src/main/antlr3/PhoenixSQL.g
index 9e843a0..9750ee7 100644
--- a/phoenix-core/src/main/antlr3/PhoenixSQL.g
+++ b/phoenix-core/src/main/antlr3/PhoenixSQL.g
@@ -84,6 +84,7 @@ tokens
     WITHIN='within';
     SET='set';
     CAST='cast';
+    ACTIVE='active';
     USABLE='usable';
     UNUSABLE='unusable';
     DISABLE='disable';
@@ -109,6 +110,7 @@ tokens
     STATISTICS='statistics';    
     COLUMNS='columns';
     TRACE='trace';
+    ASYNC='async';
 }
 
 
@@ -405,9 +407,10 @@ create_index_node returns [CreateIndexStatement ret]
     :   CREATE l=LOCAL? INDEX (IF NOT ex=EXISTS)? i=index_name ON t=from_table_name
         (LPAREN ik=ik_constraint RPAREN)
         (INCLUDE (LPAREN icrefs=column_names RPAREN))?
+        (async=ASYNC)?
         (p=fam_properties)?
         (SPLIT ON v=value_expression_list)?
-        {ret = factory.createIndex(i, factory.namedTable(null,t), ik, icrefs, v, p, ex!=null, l==null ? IndexType.getDefault() : IndexType.LOCAL, getBindCount()); }
+        {ret = factory.createIndex(i, factory.namedTable(null,t), ik, icrefs, v, p, ex!=null, l==null ? IndexType.getDefault() : IndexType.LOCAL, async != null, getBindCount()); }
     ;
 
 // Parse a create sequence statement.
@@ -498,7 +501,7 @@ drop_index_node returns [DropIndexStatement ret]
 
 // Parse a alter index statement
 alter_index_node returns [AlterIndexStatement ret]
-    : ALTER INDEX (IF ex=EXISTS)? i=index_name ON t=from_table_name s=(USABLE | UNUSABLE | REBUILD | DISABLE)
+    : ALTER INDEX (IF ex=EXISTS)? i=index_name ON t=from_table_name s=(USABLE | UNUSABLE | REBUILD | DISABLE | ACTIVE)
       {ret = factory.alterIndex(factory.namedTable(null, TableName.create(t.getSchemaName(), i.getName())), t.getTableName(), ex!=null, PIndexState.valueOf(SchemaUtil.normalizeIdentifier(s.getText()))); }
     ;
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
index c8cf28e..5836b99 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostIndexDDLCompiler.java
@@ -28,6 +28,8 @@ import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.util.IndexUtil;
 
+import com.google.common.collect.Lists;
+
 
 /**
  * Class that compiles plan to generate initial data values after a DDL command for
@@ -36,10 +38,15 @@ import org.apache.phoenix.util.IndexUtil;
 public class PostIndexDDLCompiler {
     private final PhoenixConnection connection;
     private final TableRef dataTableRef;
-
+    private List<String> indexColumnNames;
+    private List<String> dataColumnNames;
+    private String selectQuery;
+    
     public PostIndexDDLCompiler(PhoenixConnection connection, TableRef dataTableRef) {
         this.connection = connection;
         this.dataTableRef = dataTableRef;
+        indexColumnNames = Lists.newArrayList();
+        dataColumnNames = Lists.newArrayList();
     }
 
     public MutationPlan compile(final PTable indexTable) throws SQLException {
@@ -66,8 +73,11 @@ public class PostIndexDDLCompiler {
         for (int i = posOffset; i < nIndexPKColumns; i++) {
             PColumn col = indexPKColumns.get(i);
             String indexColName = col.getName().getString();
-            dataColumns.append(col.getExpressionStr()).append(",");
+            String dataColName = col.getExpressionStr();
+            dataColumns.append(dataColName).append(",");
             indexColumns.append('"').append(indexColName).append("\",");
+            indexColumnNames.add(indexColName);
+            dataColumnNames.add(dataColName);
         }
         
         // Add the covered columns
@@ -82,6 +92,8 @@ public class PostIndexDDLCompiler {
                     }
                     dataColumns.append('"').append(dataColumnName).append("\",");
                     indexColumns.append('"').append(indexColName).append("\",");
+                    indexColumnNames.add(indexColName);
+                    dataColumnNames.add(dataColumnName);
                 }
             }
         }
@@ -93,11 +105,27 @@ public class PostIndexDDLCompiler {
         
         StringBuilder updateStmtStr = new StringBuilder();
         updateStmtStr.append("UPSERT /*+ NO_INDEX */ INTO ").append(schemaName.length() == 0 ? "" : '"' + schemaName + "\".").append('"').append(tableName).append("\"(")
-            .append(indexColumns).append(") SELECT ").append(dataColumns).append(" FROM ")
-            .append(schemaName.length() == 0 ? "" : '"' + schemaName + "\".").append('"').append(dataTableRef.getTable().getTableName().getString()).append('"');
+           .append(indexColumns).append(") ");
+        final StringBuilder selectQueryBuilder = new StringBuilder();
+        selectQueryBuilder.append(" SELECT ").append(dataColumns).append(" FROM ")
+        .append(schemaName.length() == 0 ? "" : '"' + schemaName + "\".").append('"').append(dataTableRef.getTable().getTableName().getString()).append('"');
+        this.selectQuery = selectQueryBuilder.toString();
+        updateStmtStr.append(this.selectQuery);
         
         final PhoenixStatement statement = new PhoenixStatement(connection);
         return statement.compileMutation(updateStmtStr.toString());
     }
 
+    public List<String> getIndexColumnNames() {
+        return indexColumnNames;
+    }
+
+    public List<String> getDataColumnNames() {
+        return dataColumnNames;
+    }
+
+    public String getSelectQuery() {
+        return selectQuery;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
index 746c0b7..4e61391 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixStatement.java
@@ -535,8 +535,8 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
     private static class ExecutableCreateIndexStatement extends CreateIndexStatement implements CompilableStatement {
 
         public ExecutableCreateIndexStatement(NamedNode indexName, NamedTableNode dataTable, IndexKeyConstraint ikConstraint, List<ColumnName> includeColumns, List<ParseNode> splits,
-                ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, IndexType indexType, int bindCount) {
-            super(indexName, dataTable, ikConstraint, includeColumns, splits, props, ifNotExists, indexType, bindCount);
+                ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, IndexType indexType, boolean async, int bindCount) {
+            super(indexName, dataTable, ikConstraint, includeColumns, splits, props, ifNotExists, indexType, async , bindCount);
         }
 
         @SuppressWarnings("unchecked")
@@ -879,8 +879,8 @@ public class PhoenixStatement implements Statement, SQLCloseable, org.apache.pho
         
         @Override
         public CreateIndexStatement createIndex(NamedNode indexName, NamedTableNode dataTable, IndexKeyConstraint ikConstraint, List<ColumnName> includeColumns, List<ParseNode> splits,
-                ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, IndexType indexType, int bindCount) {
-            return new ExecutableCreateIndexStatement(indexName, dataTable, ikConstraint, includeColumns, splits, props, ifNotExists, indexType, bindCount);
+                ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, IndexType indexType, boolean async, int bindCount) {
+            return new ExecutableCreateIndexStatement(indexName, dataTable, ikConstraint, includeColumns, splits, props, ifNotExists, indexType, async, bindCount);
         }
         
         @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
index e62cbb8..8f0f7d5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvBulkImportUtil.java
@@ -19,10 +19,12 @@ package org.apache.phoenix.mapreduce;
 
 import java.util.List;
 
-import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.util.ColumnInfo;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Collection of utility methods for setting up bulk import jobs.
  */
@@ -65,7 +67,7 @@ public class CsvBulkImportUtil {
      */
     public static void configurePreUpsertProcessor(Configuration conf,
             Class<? extends ImportPreUpsertKeyValueProcessor> processorClass) {
-        conf.setClass(CsvToKeyValueMapper.UPSERT_HOOK_CLASS_CONFKEY, processorClass,
+        conf.setClass(PhoenixConfigurationUtil.UPSERT_HOOK_CLASS_CONFKEY, processorClass,
                 ImportPreUpsertKeyValueProcessor.class);
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
index 6ff7ba3..90cb854 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapper.java
@@ -28,15 +28,6 @@ import java.util.Properties;
 
 import javax.annotation.Nullable;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
 import org.apache.commons.csv.CSVFormat;
 import org.apache.commons.csv.CSVParser;
 import org.apache.commons.csv.CSVRecord;
@@ -49,9 +40,9 @@ import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.util.CSVCommonsLoader;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -59,6 +50,15 @@ import org.apache.phoenix.util.csv.CsvUpsertExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
 /**
  * MapReduce mapper that converts CSV input lines into KeyValues that can be written to HFiles.
  * <p/>
@@ -73,9 +73,6 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,ImmutableBytes
 
     private static final String COUNTER_GROUP_NAME = "Phoenix MapReduce Import";
 
-    /** Configuration key for the class name of an ImportPreUpsertKeyValueProcessor */
-    public static final String UPSERT_HOOK_CLASS_CONFKEY = "phoenix.mapreduce.import.kvprocessor";
-
     /** Configuration key for the field delimiter for input csv records */
     public static final String FIELD_DELIMITER_CONFKEY = "phoenix.mapreduce.import.fielddelimiter";
 
@@ -136,7 +133,7 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,ImmutableBytes
         csvLineParser = new CsvLineParser(conf.get(FIELD_DELIMITER_CONFKEY).charAt(0), conf.get(QUOTE_CHAR_CONFKEY).charAt(0),
                 conf.get(ESCAPE_CHAR_CONFKEY).charAt(0));
 
-        preUpdateProcessor = loadPreUpsertProcessor(conf);
+        preUpdateProcessor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
         if(!conf.get(CsvToKeyValueMapper.INDEX_TABLE_NAME_CONFKEY, "").isEmpty()){
         	tableName = Bytes.toBytes(conf.get(CsvToKeyValueMapper.INDEX_TABLE_NAME_CONFKEY));
         } else {
@@ -193,23 +190,6 @@ public class CsvToKeyValueMapper extends Mapper<LongWritable,Text,ImmutableBytes
     }
 
     /**
-     * Load the configured ImportPreUpsertKeyValueProcessor, or supply a dummy processor.
-     */
-    @VisibleForTesting
-    static ImportPreUpsertKeyValueProcessor loadPreUpsertProcessor(Configuration conf) {
-        Class<? extends ImportPreUpsertKeyValueProcessor> processorClass = null;
-        try {
-            processorClass = conf.getClass(
-                    UPSERT_HOOK_CLASS_CONFKEY, DefaultImportPreUpsertKeyValueProcessor.class,
-                    ImportPreUpsertKeyValueProcessor.class);
-        } catch (Exception e) {
-            throw new IllegalStateException("Couldn't load upsert hook class", e);
-        }
-
-        return ReflectionUtils.newInstance(processorClass, conf);
-    }
-
-    /**
      * Build up the JDBC URL for connecting to Phoenix.
      *
      * @return the full JDBC URL for a Phoenix connection

http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
index 7c67c2c..a83b9ae 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
@@ -22,6 +22,7 @@ import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.List;
+import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -39,6 +40,7 @@ import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.mapreduce.util.ConnectionUtil;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.util.PhoenixRuntime;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -99,7 +101,12 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr
     private QueryPlan getQueryPlan(final JobContext context,final Configuration configuration) throws IOException {
         Preconditions.checkNotNull(context);
         try{
-            final Connection connection = ConnectionUtil.getConnection(configuration);
+            final String currentScnValue = configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
+            final Properties overridingProps = new Properties();
+            if(currentScnValue != null) {
+                overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue);
+            }
+            final Connection connection = ConnectionUtil.getConnection(configuration,overridingProps);
             final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
             Preconditions.checkNotNull(selectStatement);
             final Statement statement = connection.createStatement();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixJobCounters.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixJobCounters.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixJobCounters.java
new file mode 100644
index 0000000..4a869d9
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixJobCounters.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce;
+
+/**
+ *  Counters used during Map Reduce jobs
+ *
+ */
+public enum PhoenixJobCounters {
+
+    INPUT_RECORDS,
+    FAILED_RECORDS,
+    OUTPUT_RECORDS;
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
new file mode 100644
index 0000000..d93ef9c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.compile.PostIndexDDLCompiler;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.util.ColumnInfoToStringEncoderDecoder;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
+import org.apache.phoenix.parse.HintNode.Hint;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.PTable.IndexType;
+import org.apache.phoenix.schema.TableRef;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.MetaDataUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.SchemaUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * An MR job to populate the index table from the data table.
+ *
+ */
+public class IndexTool extends Configured implements Tool {
+
+    private static final Logger LOG = LoggerFactory.getLogger(IndexTool.class);
+
+    private static final Option SCHEMA_NAME_OPTION = new Option("s", "schema", true, "Phoenix schema name (optional)");
+    private static final Option DATA_TABLE_OPTION = new Option("dt", "data-table", true, "Data table name (mandatory)");
+    private static final Option INDEX_TABLE_OPTION = new Option("it", "index-table", true, "Index table name(mandatory)");
+    private static final Option OUTPUT_PATH_OPTION = new Option("op", "output-path", true, "Output path where the files are written(mandatory)");
+    private static final Option HELP_OPTION = new Option("h", "help", false, "Help");
+    
+    private static final String ALTER_INDEX_QUERY_TEMPLATE = "ALTER INDEX IF EXISTS %s ON %s %s";  
+    private static final String INDEX_JOB_NAME_TEMPLATE = "PHOENIX_%s_INDX_%s";
+    
+    
+    private Options getOptions() {
+        final Options options = new Options();
+        options.addOption(SCHEMA_NAME_OPTION);
+        options.addOption(DATA_TABLE_OPTION);
+        options.addOption(INDEX_TABLE_OPTION);
+        options.addOption(OUTPUT_PATH_OPTION);
+        options.addOption(HELP_OPTION);
+        return options;
+    }
+    
+    /**
+     * Parses the commandline arguments, throws IllegalStateException if mandatory arguments are
+     * missing.
+     *
+     * @param args supplied command line arguments
+     * @return the parsed command line
+     */
+    private CommandLine parseOptions(String[] args) {
+
+        final Options options = getOptions();
+
+        CommandLineParser parser = new PosixParser();
+        CommandLine cmdLine = null;
+        try {
+            cmdLine = parser.parse(options, args);
+        } catch (ParseException e) {
+            printHelpAndExit("Error parsing command line options: "+ e.getMessage(), options);
+        }
+
+        if (cmdLine.hasOption(HELP_OPTION.getOpt())) {
+            printHelpAndExit(options, 0);
+        }
+
+        if (!cmdLine.hasOption(DATA_TABLE_OPTION.getOpt())) {
+            throw new IllegalStateException(DATA_TABLE_OPTION.getLongOpt() + " is a mandatory " 
+                   + "parameter");
+        }
+        
+        if (!cmdLine.hasOption(INDEX_TABLE_OPTION.getOpt())) {
+            throw new IllegalStateException(INDEX_TABLE_OPTION.getLongOpt() + " is a mandatory " 
+                   + "parameter");
+        }
+
+        if (!cmdLine.hasOption(OUTPUT_PATH_OPTION.getOpt())) {
+            throw new IllegalStateException(OUTPUT_PATH_OPTION.getLongOpt() + " is a mandatory " 
+                   + "parameter");
+        }
+        return cmdLine;
+    }
+
+   
+    private void printHelpAndExit(String errorMessage, Options options) {
+        System.err.println(errorMessage);
+        printHelpAndExit(options, 1);
+    }
+
+    private void printHelpAndExit(Options options, int exitCode) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp("help", options);
+        System.exit(exitCode);
+    }
+    
+    @Override
+    public int run(String[] args) throws Exception {
+        Connection connection = null;
+        try {
+            CommandLine cmdLine = null;
+            try {
+                cmdLine = parseOptions(args);
+            } catch (IllegalStateException e) {
+                printHelpAndExit(e.getMessage(), getOptions());
+            }
+            final Configuration configuration = HBaseConfiguration.addHbaseResources(getConf());
+            final String schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPTION.getOpt());
+            final String dataTable = cmdLine.getOptionValue(DATA_TABLE_OPTION.getOpt());
+            final String indexTable = cmdLine.getOptionValue(INDEX_TABLE_OPTION.getOpt());
+            final String qDataTable = SchemaUtil.getTableName(schemaName, dataTable);
+            final String qIndexTable = SchemaUtil.getTableName(schemaName, indexTable);
+         
+            connection = ConnectionUtil.getConnection(configuration);
+            if(!isValidIndexTable(connection, dataTable, indexTable)) {
+                throw new IllegalArgumentException(String.format(" %s is not an index table for %s ",qIndexTable,qDataTable));
+            }
+            
+            final PTable pdataTable = PhoenixRuntime.getTable(connection, dataTable);
+            final PTable pindexTable = PhoenixRuntime.getTable(connection, indexTable);
+            
+            // this is set to ensure index tables remains consistent post population.
+            long indxTimestamp = pindexTable.getTimeStamp();
+            configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE,Long.toString(indxTimestamp + 1));
+            
+            // check if the index type is LOCAL, if so, set the logicalIndexName that is computed from the dataTable name.
+            String logicalIndexTable = qIndexTable;
+            if(IndexType.LOCAL.equals(pindexTable.getIndexType())) {
+                logicalIndexTable  = MetaDataUtil.getLocalIndexTableName(dataTable);
+            }
+            
+            final PhoenixConnection pConnection = connection.unwrap(PhoenixConnection.class);
+            final PostIndexDDLCompiler ddlCompiler = new PostIndexDDLCompiler(pConnection,new TableRef(pdataTable));
+            ddlCompiler.compile(pindexTable);
+            
+            final List<String> indexColumns = ddlCompiler.getIndexColumnNames();
+            final String selectQuery = ddlCompiler.getSelectQuery();
+            final String upsertQuery = QueryUtil.constructUpsertStatement(indexTable, indexColumns, Hint.NO_INDEX);
+       
+            configuration.set(PhoenixConfigurationUtil.UPSERT_STATEMENT, upsertQuery);
+            PhoenixConfigurationUtil.setOutputTableName(configuration, logicalIndexTable);
+            PhoenixConfigurationUtil.setUpsertColumnNames(configuration,indexColumns.toArray(new String[indexColumns.size()]));
+            final List<ColumnInfo> columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, qIndexTable, indexColumns);
+            final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnMetadataList);
+            configuration.set(PhoenixConfigurationUtil.UPSERT_COLUMN_INFO_KEY, encodedColumnInfos);
+            
+            final Path outputPath =  new Path(cmdLine.getOptionValue(OUTPUT_PATH_OPTION.getOpt()),logicalIndexTable);
+            
+            final String jobName = String.format(INDEX_JOB_NAME_TEMPLATE,dataTable,indexTable);
+            final Job job = Job.getInstance(configuration, jobName);
+            job.setJarByClass(IndexTool.class);
+           
+            job.setMapperClass(PhoenixIndexImportMapper.class);
+            job.setMapOutputKeyClass(ImmutableBytesWritable.class); 
+            job.setMapOutputValueClass(KeyValue.class);
+            PhoenixMapReduceUtil.setInput(job,PhoenixIndexDBWritable.class,dataTable,selectQuery);
+     
+            TableMapReduceUtil.initCredentials(job);
+            FileOutputFormat.setOutputPath(job, outputPath);
+            
+            final HTable htable = new HTable(configuration, logicalIndexTable);
+            HFileOutputFormat.configureIncrementalLoad(job, htable);
+
+            boolean status = job.waitForCompletion(true);
+            if (!status) {
+                LOG.error("Failed to run the IndexTool job. ");
+                htable.close();
+                return -1;
+            }
+            
+            LOG.info("Loading HFiles from {}", outputPath);
+            LoadIncrementalHFiles loader = new LoadIncrementalHFiles(configuration);
+            loader.doBulkLoad(outputPath, htable);
+            htable.close();
+            
+            LOG.info("Removing output directory {}", outputPath);
+            if (!FileSystem.get(configuration).delete(outputPath, true)) {
+                LOG.error("Removing output directory {} failed", outputPath);
+            }
+            
+            // finally update the index state to ACTIVE.
+            updateIndexState(connection,dataTable,indexTable,PIndexState.ACTIVE);
+            return 0;
+            
+        } catch (Exception ex) {
+           LOG.error(" An exception occured while performing the indexing job , error message {} ",ex.getMessage());
+           return -1;
+        } finally {
+            try {
+                if(connection != null) {
+                    connection.close();
+                }
+            } catch(SQLException sqle) {
+                LOG.error(" Failed to close connection ",sqle.getMessage());
+                throw new RuntimeException("Failed to close connection");
+            }
+        }
+    }
+
+    /**
+     * Checks for the validity of the index table passed to the job.
+     * @param connection
+     * @param masterTable
+     * @param indexTable
+     * @return
+     * @throws SQLException
+     */
+    private boolean isValidIndexTable(final Connection connection, final String masterTable, final String indexTable) throws SQLException {
+        final DatabaseMetaData dbMetaData = connection.getMetaData();
+        final String schemaName = SchemaUtil.getSchemaNameFromFullName(masterTable);
+        final String tableName = SchemaUtil.getTableNameFromFullName(masterTable);
+        
+        ResultSet rs = null;
+        try {
+            rs = dbMetaData.getIndexInfo(null, schemaName, tableName, false, false);
+            while(rs.next()) {
+                final String indexName = rs.getString(6);
+                if(indexTable.equalsIgnoreCase(indexName)) {
+                    return true;
+                }
+            }
+        } finally {
+            if(rs != null) {
+                rs.close();
+            }
+        }
+        return false;
+    }
+    
+    /**
+     * Updates the index state.
+     * @param connection
+     * @param masterTable
+     * @param indexTable
+     * @param state
+     * @throws SQLException
+     */
+    private void updateIndexState(Connection connection, final String masterTable , final String indexTable, PIndexState state) throws SQLException {
+        Preconditions.checkNotNull(connection);
+        final String alterQuery = String.format(ALTER_INDEX_QUERY_TEMPLATE,indexTable,masterTable,state.name());
+        connection.createStatement().execute(alterQuery);
+        LOG.info(" Updated the status of the index {} to {} " , indexTable , state.name());
+    }
+    
+    public static void main(final String[] args) throws Exception {
+        int result = ToolRunner.run(new IndexTool(), args);
+        System.exit(result);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexDBWritable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexDBWritable.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexDBWritable.java
new file mode 100644
index 0000000..2be810a
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexDBWritable.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.phoenix.util.ColumnInfo;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * A {@link DBWritable} class that reads and write records.
+ *
+ * 
+ */
+public class PhoenixIndexDBWritable  implements DBWritable { 
+    
+    private List<ColumnInfo> columnMetadata;
+    
+    private List<Object> values;
+    
+    private int columnCount = -1;
+    
+    @Override
+    public void write(PreparedStatement statement) throws SQLException {
+       Preconditions.checkNotNull(values);
+       Preconditions.checkNotNull(columnMetadata);
+       for(int i = 0 ; i < values.size() ; i++) {
+           Object value = values.get(i);
+           ColumnInfo columnInfo = columnMetadata.get(i);
+           if(value == null) {
+               statement.setNull(i + 1, columnInfo.getSqlType());               
+           } else {
+               statement.setObject(i + 1, value , columnInfo.getSqlType());
+           }
+       }
+       
+    }
+
+    @Override
+    public void readFields(ResultSet resultSet) throws SQLException {
+        // we do this once per mapper.
+        if(columnCount == -1) {
+            this.columnCount = resultSet.getMetaData().getColumnCount();
+        }
+  
+        values = Lists.newArrayListWithCapacity(columnCount);
+        for(int i = 0 ; i < columnCount ; i++) {
+            Object value = resultSet.getObject(i + 1);
+            values.add(value);
+        }
+        
+    }
+
+    public List<ColumnInfo> getColumnMetadata() {
+        return columnMetadata;
+    }
+
+    public void setColumnMetadata(List<ColumnInfo> columnMetadata) {
+        this.columnMetadata = columnMetadata;
+    }
+
+    public List<Object> getValues() {
+        return values;
+    }
+
+    public void setValues(List<Object> values) {
+        this.values = values;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
new file mode 100644
index 0000000..7bf4bfc
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.mapreduce.index;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.phoenix.mapreduce.ImportPreUpsertKeyValueProcessor;
+import org.apache.phoenix.mapreduce.PhoenixJobCounters;
+import org.apache.phoenix.mapreduce.util.ConnectionUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Mapper that hands over rows from data table to the index table.
+ *
+ */
+public class PhoenixIndexImportMapper extends Mapper<NullWritable, PhoenixIndexDBWritable, ImmutableBytesWritable, KeyValue> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PhoenixIndexImportMapper.class);
+    
+    private final PhoenixIndexDBWritable indxWritable = new PhoenixIndexDBWritable();
+    
+    private List<ColumnInfo> indxTblColumnMetadata ;
+    
+    private Connection connection;
+    
+    private String indexTableName;
+    
+    private ImportPreUpsertKeyValueProcessor preUpdateProcessor;
+    
+    private PreparedStatement pStatement;
+    
+    @Override
+    protected void setup(final Context context) throws IOException, InterruptedException {
+        super.setup(context);
+        final Configuration configuration = context.getConfiguration();
+        try {
+            indxTblColumnMetadata = PhoenixConfigurationUtil.getUpsertColumnMetadataList(context.getConfiguration());
+            indxWritable.setColumnMetadata(indxTblColumnMetadata);
+            
+            preUpdateProcessor = PhoenixConfigurationUtil.loadPreUpsertProcessor(configuration);
+            indexTableName = PhoenixConfigurationUtil.getOutputTableName(configuration);
+            final Properties overrideProps = new Properties ();
+            overrideProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE));
+            connection = ConnectionUtil.getConnection(configuration,overrideProps);
+            connection.setAutoCommit(false);
+            final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration);
+            this.pStatement = connection.prepareStatement(upsertQuery);
+            
+        } catch (SQLException e) {
+            throw new RuntimeException(e.getMessage());
+        } 
+    }
+    
+    @Override
+    protected void map(NullWritable key, PhoenixIndexDBWritable record, Context context)
+            throws IOException, InterruptedException {
+       
+        context.getCounter(PhoenixJobCounters.INPUT_RECORDS).increment(1);
+        
+        try {
+           final ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
+           final List<Object> values = record.getValues();
+           indxWritable.setValues(values);
+           indxWritable.write(this.pStatement);
+           this.pStatement.execute();
+            
+           final Iterator<Pair<byte[], List<KeyValue>>> uncommittedDataIterator = PhoenixRuntime.getUncommittedDataIterator(connection, true);
+           while (uncommittedDataIterator.hasNext()) {
+                Pair<byte[], List<KeyValue>> kvPair = uncommittedDataIterator.next();
+                if (Bytes.compareTo(Bytes.toBytes(indexTableName), kvPair.getFirst()) != 0) {
+                    // skip edits for other tables
+                    continue;
+                }
+                List<KeyValue> keyValueList = kvPair.getSecond();
+                keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), keyValueList);
+                for (KeyValue kv : keyValueList) {
+                    outputKey.set(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength());
+                    context.write(outputKey, kv);
+                }
+                context.getCounter(PhoenixJobCounters.OUTPUT_RECORDS).increment(1);
+            }
+            connection.rollback();
+       } catch (SQLException e) {
+           LOG.error(" Error {}  while read/write of a record ",e.getMessage());
+           context.getCounter(PhoenixJobCounters.FAILED_RECORDS).increment(1);
+           throw new RuntimeException(e);
+        } 
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException, InterruptedException {
+         super.cleanup(context);
+         if(connection != null) {
+             try {
+                connection.close();
+            } catch (SQLException e) {
+                LOG.error("Error {} while closing connection in the PhoenixIndexMapper class ",e.getMessage());
+            }
+         }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
index 364baf7..3234967 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java
@@ -42,6 +42,17 @@ public class ConnectionUtil {
      * @throws SQLException
      */
     public static Connection getConnection(final Configuration configuration) throws SQLException {
+        return getConnection(configuration, null);
+    }
+    
+    /**
+     * Used primarily in cases where we need to pass few additional/overriding properties 
+     * @param configuration
+     * @param properties
+     * @return
+     * @throws SQLException
+     */
+    public static Connection getConnection(final Configuration configuration , final Properties properties) throws SQLException {
         Preconditions.checkNotNull(configuration);
         final Properties props = new Properties();
         Iterator<Map.Entry<String, String>> iterator = configuration.iterator();
@@ -51,6 +62,9 @@ public class ConnectionUtil {
                 props.setProperty(entry.getKey(), entry.getValue());
             }
         }
+        if(properties != null && !properties.isEmpty()) {
+            props.putAll(properties);
+        }
         final Connection conn = DriverManager.getConnection(QueryUtil.getUrl(configuration.get(HConstants.ZOOKEEPER_QUORUM)), props);
         return conn;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 4d025ee..b8b64b2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -33,7 +33,10 @@ import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable;
 import org.apache.hadoop.mapreduce.lib.db.DBWritable;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.mapreduce.CsvToKeyValueMapper.DefaultImportPreUpsertKeyValueProcessor;
+import org.apache.phoenix.mapreduce.ImportPreUpsertKeyValueProcessor;
 import org.apache.phoenix.mapreduce.PhoenixInputFormat;
 import org.apache.phoenix.util.ColumnInfo;
 import org.apache.phoenix.util.PhoenixRuntime;
@@ -83,6 +86,11 @@ public final class PhoenixConfigurationUtil {
 
     public static final String INPUT_CLASS = "phoenix.input.class";
     
+    public static final String CURRENT_SCN_VALUE = "phoenix.mr.currentscn.value";
+    
+    /** Configuration key for the class name of an ImportPreUpsertKeyValueProcessor */
+    public static final String UPSERT_HOOK_CLASS_CONFKEY = "phoenix.mapreduce.import.kvprocessor";
+    
     public enum SchemaType {
         TABLE,
         QUERY;
@@ -313,4 +321,17 @@ public final class PhoenixConfigurationUtil {
         //In order to have phoenix working on a secured cluster
         TableMapReduceUtil.initCredentials(job);
     }
+    
+    public static ImportPreUpsertKeyValueProcessor loadPreUpsertProcessor(Configuration conf) {
+        Class<? extends ImportPreUpsertKeyValueProcessor> processorClass = null;
+        try {
+            processorClass = conf.getClass(
+                    UPSERT_HOOK_CLASS_CONFKEY, DefaultImportPreUpsertKeyValueProcessor.class,
+                    ImportPreUpsertKeyValueProcessor.class);
+        } catch (Exception e) {
+            throw new IllegalStateException("Couldn't load upsert hook class", e);
+        }
+    
+        return ReflectionUtils.newInstance(processorClass, conf);
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateIndexStatement.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateIndexStatement.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateIndexStatement.java
index bf76174..5f52f59 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateIndexStatement.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/CreateIndexStatement.java
@@ -35,10 +35,11 @@ public class CreateIndexStatement extends SingleTableStatement {
     private final ListMultimap<String,Pair<String,Object>> props;
     private final boolean ifNotExists;
     private final IndexType indexType;
+    private final boolean async;
 
     public CreateIndexStatement(NamedNode indexTableName, NamedTableNode dataTable, 
             IndexKeyConstraint indexKeyConstraint, List<ColumnName> includeColumns, List<ParseNode> splits,
-            ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, IndexType indexType, int bindCount) {
+            ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, IndexType indexType, boolean async, int bindCount) {
         super(dataTable, bindCount);
         this.indexTableName =TableName.create(dataTable.getName().getSchemaName(),indexTableName.getName());
         this.indexKeyConstraint = indexKeyConstraint == null ? IndexKeyConstraint.EMPTY : indexKeyConstraint;
@@ -47,6 +48,7 @@ public class CreateIndexStatement extends SingleTableStatement {
         this.props = props == null ? ArrayListMultimap.<String,Pair<String,Object>>create() : props;
         this.ifNotExists = ifNotExists;
         this.indexType = indexType;
+        this.async = async;
     }
 
     public IndexKeyConstraint getIndexConstraint() {
@@ -78,4 +80,8 @@ public class CreateIndexStatement extends SingleTableStatement {
         return indexType;
     }
 
+    public boolean isAsync() {
+        return async;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
index 4e8f792..2a4168d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/parse/ParseNodeFactory.java
@@ -282,8 +282,8 @@ public class ParseNodeFactory {
         return new CreateTableStatement(tableName, props, columns, pkConstraint, splits, tableType, ifNotExists, baseTableName, tableTypeIdNode, bindCount);
     }
 
-    public CreateIndexStatement createIndex(NamedNode indexName, NamedTableNode dataTable, IndexKeyConstraint ikConstraint, List<ColumnName> includeColumns, List<ParseNode> splits, ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, IndexType indexType, int bindCount) {
-        return new CreateIndexStatement(indexName, dataTable, ikConstraint, includeColumns, splits, props, ifNotExists, indexType, bindCount);
+    public CreateIndexStatement createIndex(NamedNode indexName, NamedTableNode dataTable, IndexKeyConstraint ikConstraint, List<ColumnName> includeColumns, List<ParseNode> splits, ListMultimap<String,Pair<String,Object>> props, boolean ifNotExists, IndexType indexType,boolean async, int bindCount) {
+        return new CreateIndexStatement(indexName, dataTable, ikConstraint, includeColumns, splits, props, ifNotExists, indexType, async, bindCount);
     }
 
     public CreateSequenceStatement createSequence(TableName tableName, ParseNode startsWith,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index e133433..ab3c284 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1128,6 +1128,11 @@ public class MetaDataClient {
             return new MutationState(0,connection);
         }
 
+        // In async process, we return immediately as the MR job needs to be triggered .
+        if(statement.isAsync()) {
+            return new MutationState(0, connection);
+        }
+        
         // If our connection is at a fixed point-in-time, we need to open a new
         // connection so that our new index table is visible.
         if (connection.getSCN() != null) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/main/java/org/apache/phoenix/util/ColumnInfo.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ColumnInfo.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ColumnInfo.java
index 55865c0..46350be 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ColumnInfo.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ColumnInfo.java
@@ -16,7 +16,6 @@ import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.types.PDataType;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
 
 /**
@@ -33,6 +32,9 @@ public class ColumnInfo {
     public ColumnInfo(String columnName, int sqlType) {
         Preconditions.checkNotNull(columnName, "columnName cannot be null");
         Preconditions.checkArgument(!columnName.isEmpty(), "columnName cannot be empty");
+        if(!columnName.startsWith(SchemaUtil.ESCAPE_CHARACTER)) {
+            columnName = SchemaUtil.getEscapedFullColumnName(columnName);
+        }
         this.columnName = columnName;
         this.sqlType = sqlType;
     }
@@ -63,7 +65,7 @@ public class ColumnInfo {
 
     @Override
     public String toString() {
-        return columnName + STR_SEPARATOR + getPDataType().getSqlTypeName();
+        return getPDataType().getSqlTypeName() + STR_SEPARATOR + columnName ;
     }
 
     @Override
@@ -97,14 +99,15 @@ public class ColumnInfo {
      */
     public static ColumnInfo fromString(String stringRepresentation) {
         List<String> components =
-                Lists.newArrayList(Splitter.on(STR_SEPARATOR).split(stringRepresentation));
+                Lists.newArrayList(stringRepresentation.split(":",2));
+        
         if (components.size() != 2) {
             throw new IllegalArgumentException("Unparseable string: " + stringRepresentation);
         }
 
         return new ColumnInfo(
-                components.get(0),
-                PDataType.fromSqlTypeName(components.get(1)).getSqlType());
+                components.get(1),
+                PDataType.fromSqlTypeName(components.get(0)).getSqlType());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
index e0b4c2e..993016a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java
@@ -39,6 +39,8 @@ import org.apache.hadoop.hbase.util.Addressing;
 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.jdbc.PhoenixDriver;
+import org.apache.phoenix.parse.HintNode;
+import org.apache.phoenix.parse.HintNode.Hint;
 import org.apache.phoenix.parse.WildcardParseNode;
 import org.apache.phoenix.query.QueryServices;
 
@@ -109,21 +111,53 @@ public final class QueryUtil {
             throw new IllegalArgumentException("At least one column must be provided for upserts");
         }
 
+        final List<String> columnNames = Lists.transform(columnInfos, new Function<ColumnInfo,String>() {
+            @Override
+            public String apply(ColumnInfo columnInfo) {
+                return columnInfo.getColumnName();
+            }
+        });
+        return constructUpsertStatement(tableName, columnNames, null);
+
+    }
+    
+    /**
+     * Generate an upsert statement based on a list of {@code ColumnInfo}s with parameter markers. The list of
+     * {@code ColumnInfo}s must contain at least one element.
+     *
+     * @param tableName name of the table for which the upsert statement is to be created
+     * @param columns list of columns to be included in the upsert statement
+     * @param Hint hint to be added to the UPSERT statement.
+     * @return the created {@code UPSERT} statement
+     */
+    public static String constructUpsertStatement(String tableName, List<String> columns, Hint hint) {
+
+        if (columns.isEmpty()) {
+            throw new IllegalArgumentException("At least one column must be provided for upserts");
+        }
+        
+        String hintStr = "";
+        if(hint != null) {
+           final HintNode node = new HintNode(hint.name());
+           hintStr = node.toString();
+        }
+        
         List<String> parameterList = Lists.newArrayList();
-        for (int i = 0; i < columnInfos.size(); i++) {
+        for (int i = 0; i < columns.size(); i++) {
             parameterList.add("?");
         }
         return String.format(
-                "UPSERT INTO %s (%s) VALUES (%s)",
+                "UPSERT %s INTO %s (%s) VALUES (%s)",
+                hintStr,
                 tableName,
                 Joiner.on(", ").join(
                         Iterables.transform(
-                                columnInfos,
-                                new Function<ColumnInfo, String>() {
+                               columns,
+                                new Function<String, String>() {
                                     @Nullable
                                     @Override
-                                    public String apply(@Nullable ColumnInfo columnInfo) {
-                                        return getEscapedFullColumnName(columnInfo.getColumnName());
+                                    public String apply(@Nullable String columnName) {
+                                        return getEscapedFullColumnName(columnName);
                                     }
                                 })),
                 Joiner.on(", ").join(parameterList));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
index c9574e3..1d986c6 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/SchemaUtil.java
@@ -632,6 +632,9 @@ public class SchemaUtil {
     }
     
     public static String getEscapedFullColumnName(String fullColumnName) {
+        if(fullColumnName.startsWith(ESCAPE_CHARACTER)) {
+            return fullColumnName;
+        }
         int index = fullColumnName.indexOf(QueryConstants.NAME_SEPARATOR);
         if (index < 0) {
             return getEscapedArgument(fullColumnName); 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
index 4cb5732..a00e228 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvBulkImportUtilTest.java
@@ -17,17 +17,19 @@
  */
 package org.apache.phoenix.mapreduce;
 
-import com.google.common.collect.ImmutableList;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.util.ColumnInfo;
 import org.junit.Test;
 
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
+import com.google.common.collect.ImmutableList;
 
 public class CsvBulkImportUtilTest {
 
@@ -57,7 +59,7 @@ public class CsvBulkImportUtilTest {
     public void testConfigurePreUpsertProcessor() {
         Configuration conf = new Configuration();
         CsvBulkImportUtil.configurePreUpsertProcessor(conf, MockProcessor.class);
-        ImportPreUpsertKeyValueProcessor processor = CsvToKeyValueMapper.loadPreUpsertProcessor(conf);
+        ImportPreUpsertKeyValueProcessor processor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
         assertEquals(MockProcessor.class, processor.getClass());
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java
index ee9d0e1..4033a65 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/CsvToKeyValueMapperTest.java
@@ -17,23 +17,25 @@
  */
 package org.apache.phoenix.mapreduce;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.List;
+
 import org.apache.commons.csv.CSVRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.schema.types.PInteger;
 import org.apache.phoenix.schema.types.PIntegerArray;
 import org.apache.phoenix.schema.types.PUnsignedInt;
 import org.apache.phoenix.util.ColumnInfo;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 
 public class CsvToKeyValueMapperTest {
 
@@ -110,10 +112,10 @@ public class CsvToKeyValueMapperTest {
     @Test
     public void testLoadPreUpdateProcessor() {
         Configuration conf = new Configuration();
-        conf.setClass(CsvToKeyValueMapper.UPSERT_HOOK_CLASS_CONFKEY, MockUpsertProcessor.class,
+        conf.setClass(PhoenixConfigurationUtil.UPSERT_HOOK_CLASS_CONFKEY, MockUpsertProcessor.class,
                 ImportPreUpsertKeyValueProcessor.class);
 
-        ImportPreUpsertKeyValueProcessor processor = CsvToKeyValueMapper.loadPreUpsertProcessor(conf);
+        ImportPreUpsertKeyValueProcessor processor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
         assertEquals(MockUpsertProcessor.class, processor.getClass());
     }
 
@@ -121,7 +123,7 @@ public class CsvToKeyValueMapperTest {
     public void testLoadPreUpdateProcessor_NotConfigured() {
 
         Configuration conf = new Configuration();
-        ImportPreUpsertKeyValueProcessor processor = CsvToKeyValueMapper.loadPreUpsertProcessor(conf);
+        ImportPreUpsertKeyValueProcessor processor = PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
 
         assertEquals(CsvToKeyValueMapper.DefaultImportPreUpsertKeyValueProcessor.class,
                 processor.getClass());
@@ -130,9 +132,9 @@ public class CsvToKeyValueMapperTest {
     @Test(expected=IllegalStateException.class)
     public void testLoadPreUpdateProcessor_ClassNotFound() {
         Configuration conf = new Configuration();
-        conf.set(CsvToKeyValueMapper.UPSERT_HOOK_CLASS_CONFKEY, "MyUndefinedClass");
+        conf.set(PhoenixConfigurationUtil.UPSERT_HOOK_CLASS_CONFKEY, "MyUndefinedClass");
 
-        CsvToKeyValueMapper.loadPreUpsertProcessor(conf);
+        PhoenixConfigurationUtil.loadPreUpsertProcessor(conf);
     }
 
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/test/java/org/apache/phoenix/util/ColumnInfoTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/ColumnInfoTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/ColumnInfoTest.java
index 931d6fd..7f460cd 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/ColumnInfoTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/ColumnInfoTest.java
@@ -30,7 +30,7 @@ public class ColumnInfoTest {
 
     @Test
     public void testToFromStringRoundTrip() {
-        ColumnInfo columnInfo = new ColumnInfo("myColumn", Types.INTEGER);
+        ColumnInfo columnInfo = new ColumnInfo("a.myColumn", Types.INTEGER);
         assertEquals(columnInfo, ColumnInfo.fromString(columnInfo.toString()));
     }
 
@@ -49,4 +49,10 @@ public class ColumnInfoTest {
             assertEquals(SQLExceptionCode.ILLEGAL_DATA.getErrorCode(), sqlE.getErrorCode());
         }
     }
+    
+    @Test
+    public void testToFromColonInColumnName() {
+        ColumnInfo columnInfo = new ColumnInfo(":myColumn", Types.INTEGER);
+        assertEquals(columnInfo, ColumnInfo.fromString(columnInfo.toString()));
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
index 33e3b5a..beabaf1 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java
@@ -38,7 +38,7 @@ public class QueryUtilTest {
     @Test
     public void testConstructUpsertStatement_ColumnInfos() {
         assertEquals(
-                "UPSERT INTO MYTAB (\"ID\", \"NAME\") VALUES (?, ?)",
+                "UPSERT  INTO MYTAB (\"ID\", \"NAME\") VALUES (?, ?)",
                 QueryUtil.constructUpsertStatement("MYTAB", ImmutableList.of(ID_COLUMN, NAME_COLUMN)));
 
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/55f64eaf/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
----------------------------------------------------------------------
diff --git a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
index 7a861b9..abfb442 100644
--- a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
+++ b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java
@@ -35,6 +35,7 @@ import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType;
 import org.apache.phoenix.schema.IllegalDataException;
 import org.apache.phoenix.util.ColumnInfo;
+import org.apache.phoenix.util.SchemaUtil;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.data.DataType;
@@ -65,10 +66,10 @@ public class PhoenixPigSchemaUtilTest {
         
         // expected schema.
         final ResourceFieldSchema[] fields = new ResourceFieldSchema[2];
-        fields[0] = new ResourceFieldSchema().setName("ID")
+        fields[0] = new ResourceFieldSchema().setName(SchemaUtil.getEscapedFullColumnName("ID"))
                                                 .setType(DataType.LONG);
 
-        fields[1] = new ResourceFieldSchema().setName("NAME")
+        fields[1] = new ResourceFieldSchema().setName(SchemaUtil.getEscapedFullColumnName("NAME"))
                                                 .setType(DataType.CHARARRAY);
         final ResourceSchema expected = new ResourceSchema().setFields(fields);