You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by td...@apache.org on 2016/02/08 20:42:00 UTC

phoenix git commit: PHOENIX-2605 Enhance IndexToolIT to test transactional tables

Repository: phoenix
Updated Branches:
  refs/heads/master e4d569cd8 -> b0122a541


PHOENIX-2605 Enhance IndexToolIT to test transactional tables


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b0122a54
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b0122a54
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b0122a54

Branch: refs/heads/master
Commit: b0122a541325fd7e40e62e3602eb0ad748b94a4f
Parents: e4d569c
Author: Thomas D'Silva <td...@salesforce.com>
Authored: Fri Jan 29 14:10:11 2016 -0800
Committer: Thomas D'Silva <td...@salesforce.com>
Committed: Mon Feb 8 11:38:47 2016 -0800

----------------------------------------------------------------------
 .../phoenix/end2end/ContextClassloaderIT.java   |   2 +-
 .../phoenix/end2end/CsvBulkLoadToolIT.java      |  87 ++----
 .../org/apache/phoenix/end2end/IndexToolIT.java | 273 +++++++------------
 .../phoenix/end2end/MutableIndexToolIT.java     | 128 +++++++++
 .../phoenix/end2end/UserDefinedFunctionsIT.java |   8 +-
 .../end2end/index/DropIndexDuringUpsertIT.java  |   2 +-
 .../index/MutableIndexReplicationIT.java        |   2 +-
 .../example/EndToEndCoveredIndexingIT.java      |   5 +-
 .../coprocessor/BaseScannerRegionObserver.java  |   7 +
 .../phoenix/mapreduce/PhoenixInputFormat.java   |  11 +-
 .../phoenix/mapreduce/index/IndexTool.java      |   9 +-
 .../index/PhoenixIndexImportDirectMapper.java   |   7 +-
 .../index/PhoenixIndexImportMapper.java         |   6 +-
 .../util/PhoenixConfigurationUtil.java          |   6 +-
 .../apache/phoenix/schema/MetaDataClient.java   |   1 -
 .../java/org/apache/phoenix/query/BaseTest.java |  56 +++-
 16 files changed, 337 insertions(+), 273 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0122a54/phoenix-core/src/it/java/org/apache/phoenix/end2end/ContextClassloaderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ContextClassloaderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ContextClassloaderIT.java
index 7d0e1da..4c67b32 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ContextClassloaderIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ContextClassloaderIT.java
@@ -62,7 +62,7 @@ public class ContextClassloaderIT  extends BaseTest {
         String clientPort = hbaseTestUtil.getConfiguration().get(QueryServices.ZOOKEEPER_PORT_ATTRIB);
         String url = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + JDBC_PROTOCOL_SEPARATOR + clientPort
                 + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
-        driver = initAndRegisterDriver(url, ReadOnlyProps.EMPTY_PROPS);
+        driver = initAndRegisterTestDriver(url, ReadOnlyProps.EMPTY_PROPS);
         
         Connection conn = DriverManager.getConnection(url);
         Statement stmt = conn.createStatement();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0122a54/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
index 6bc03bf..26ec889 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CsvBulkLoadToolIT.java
@@ -17,7 +17,6 @@
  */
 package org.apache.phoenix.end2end;
 
-import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
 import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -30,65 +29,37 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.Statement;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.mapred.FileAlreadyExistsException;
-import org.apache.phoenix.jdbc.PhoenixDriver;
 import org.apache.phoenix.mapreduce.CsvBulkLoadTool;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.util.DateUtil;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.junit.AfterClass;
+import org.apache.phoenix.util.ReadOnlyProps;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.junit.experimental.categories.Category;
 
-@Category(NeedsOwnMiniClusterTest.class)
-public class CsvBulkLoadToolIT {
+import com.google.common.collect.Maps;
+
+public class CsvBulkLoadToolIT extends BaseOwnClusterHBaseManagedTimeIT {
 
-    // We use HBaseTestUtil because we need to start up a MapReduce cluster as well
-    private static HBaseTestingUtility hbaseTestUtil;
-    private static String zkQuorum;
     private static Connection conn;
+    private static String zkQuorum;
 
     @BeforeClass
-    public static void setUp() throws Exception {
-        hbaseTestUtil = new HBaseTestingUtility();
-        Configuration conf = hbaseTestUtil.getConfiguration();
-        setUpConfigForMiniCluster(conf);
-        // Since we're using the real PhoenixDriver in this test, remove the
-        // extra JDBC argument that causes the test driver to be used.
-        conf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
-        hbaseTestUtil.startMiniCluster();
-        hbaseTestUtil.startMiniMapReduceCluster();
-
-        Class.forName(PhoenixDriver.class.getName());
-        DriverManager.registerDriver(PhoenixDriver.INSTANCE);
-        zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort();
-        conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL
-                + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum);
-    }
-
-    @AfterClass
-    public static void tearDownAfterClass() throws Exception {
-        try {
-            if (conn != null) conn.close();
-        } finally {
-            try {
-                DriverManager.deregisterDriver(PhoenixDriver.INSTANCE);
-            } finally {
-                try {
-                    hbaseTestUtil.shutdownMiniMapReduceCluster();
-                } finally {
-                    hbaseTestUtil.shutdownMiniCluster();
-                }
-            }
-        }
+    public static void doSetup() throws Exception {
+        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(1);
+        serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+        Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
+        clientProps.put(QueryServices.TRANSACTIONS_ENABLED, "true");
+        setUpRealDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
+        zkQuorum = "localhost:" + getUtility().getZkCluster().getClientPort();
+        conn = DriverManager.getConnection(getUrl());
     }
 
     @Test
@@ -97,7 +68,7 @@ public class CsvBulkLoadToolIT {
         Statement stmt = conn.createStatement();
         stmt.execute("CREATE TABLE S.TABLE1 (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, T DATE) SPLIT ON (1,2)");
 
-        FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
+        FileSystem fs = FileSystem.get(getUtility().getConfiguration());
         FSDataOutputStream outputStream = fs.create(new Path("/tmp/input1.csv"));
         PrintWriter printWriter = new PrintWriter(outputStream);
         printWriter.println("1,Name 1,1970/01/01");
@@ -105,7 +76,7 @@ public class CsvBulkLoadToolIT {
         printWriter.close();
 
         CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
-        csvBulkLoadTool.setConf(new Configuration(hbaseTestUtil.getConfiguration()));
+        csvBulkLoadTool.setConf(new Configuration(getUtility().getConfiguration()));
         csvBulkLoadTool.getConf().set(DATE_FORMAT_ATTRIB,"yyyy/MM/dd");
         int exitCode = csvBulkLoadTool.run(new String[] {
                 "--input", "/tmp/input1.csv",
@@ -136,7 +107,7 @@ public class CsvBulkLoadToolIT {
         stmt.execute("CREATE TABLE TABLE2 (ID INTEGER NOT NULL PRIMARY KEY, " +
                 "NAME VARCHAR, NAMES VARCHAR ARRAY)");
 
-        FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
+        FileSystem fs = FileSystem.get(getUtility().getConfiguration());
         FSDataOutputStream outputStream = fs.create(new Path("/tmp/input2.csv"));
         PrintWriter printWriter = new PrintWriter(outputStream);
         printWriter.println("1|Name 1a;Name 1b");
@@ -144,7 +115,7 @@ public class CsvBulkLoadToolIT {
         printWriter.close();
 
         CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
-        csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration());
+        csvBulkLoadTool.setConf(getUtility().getConfiguration());
         int exitCode = csvBulkLoadTool.run(new String[] {
                 "--input", "/tmp/input2.csv",
                 "--table", "table2",
@@ -173,7 +144,7 @@ public class CsvBulkLoadToolIT {
         Statement stmt = conn.createStatement();
         stmt.execute("CREATE TABLE TABLE7 (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, T DATE) SPLIT ON (1,2)");
 
-        FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
+        FileSystem fs = FileSystem.get(getUtility().getConfiguration());
         FSDataOutputStream outputStream = fs.create(new Path("/tmp/input1.csv"));
         PrintWriter printWriter = new PrintWriter(outputStream);
         printWriter.println("1,Name 1,1970/01/01");
@@ -184,7 +155,7 @@ public class CsvBulkLoadToolIT {
         printWriter.close();
 
         CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
-        csvBulkLoadTool.setConf(new Configuration(hbaseTestUtil.getConfiguration()));
+        csvBulkLoadTool.setConf(new Configuration(getUtility().getConfiguration()));
         csvBulkLoadTool.getConf().set(DATE_FORMAT_ATTRIB,"yyyy/MM/dd");
         int exitCode = csvBulkLoadTool.run(new String[] {
             "--input", "/tmp/input1.csv,/tmp/input2.csv",
@@ -218,7 +189,7 @@ public class CsvBulkLoadToolIT {
                 + " INCLUDE (LAST_NAME)";
         stmt.execute(ddl);
         
-        FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
+        FileSystem fs = FileSystem.get(getUtility().getConfiguration());
         FSDataOutputStream outputStream = fs.create(new Path("/tmp/input3.csv"));
         PrintWriter printWriter = new PrintWriter(outputStream);
         printWriter.println("1,FirstName 1,LastName 1");
@@ -226,7 +197,7 @@ public class CsvBulkLoadToolIT {
         printWriter.close();
 
         CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
-        csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration());
+        csvBulkLoadTool.setConf(getUtility().getConfiguration());
         int exitCode = csvBulkLoadTool.run(new String[] {
                 "--input", "/tmp/input3.csv",
                 "--table", "table3",
@@ -254,7 +225,7 @@ public class CsvBulkLoadToolIT {
         ddl = "CREATE LOCAL INDEX TABLE6_IDX2 ON TABLE6 " + " (LAST_NAME ASC)";
         stmt.execute(ddl);
 
-        FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
+        FileSystem fs = FileSystem.get(getUtility().getConfiguration());
         FSDataOutputStream outputStream = fs.create(new Path("/tmp/input3.csv"));
         PrintWriter printWriter = new PrintWriter(outputStream);
         printWriter.println("1,FirstName 1,LastName 1");
@@ -262,7 +233,7 @@ public class CsvBulkLoadToolIT {
         printWriter.close();
 
         CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
-        csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration());
+        csvBulkLoadTool.setConf(getUtility().getConfiguration());
         try {
             csvBulkLoadTool.run(new String[] {
                     "--input", "/tmp/input3.csv",
@@ -296,7 +267,7 @@ public class CsvBulkLoadToolIT {
                         + tableName + "(FIRST_NAME ASC)";
         stmt.execute(ddl);
 
-        FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
+        FileSystem fs = FileSystem.get(getUtility().getConfiguration());
         FSDataOutputStream outputStream = fs.create(new Path("/tmp/input4.csv"));
         PrintWriter printWriter = new PrintWriter(outputStream);
         printWriter.println("1,FirstName 1,LastName 1");
@@ -304,7 +275,7 @@ public class CsvBulkLoadToolIT {
         printWriter.close();
 
         CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
-        csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration());
+        csvBulkLoadTool.setConf(getUtility().getConfiguration());
         int exitCode = csvBulkLoadTool.run(new String[] {
                 "--input", "/tmp/input4.csv",
                 "--table", tableName,
@@ -326,7 +297,7 @@ public class CsvBulkLoadToolIT {
     public void testInvalidArguments() {
         String tableName = "TABLE8";
         CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
-        csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration());
+        csvBulkLoadTool.setConf(getUtility().getConfiguration());
         try {
             csvBulkLoadTool.run(new String[] {
                 "--input", "/tmp/input4.csv",
@@ -348,7 +319,7 @@ public class CsvBulkLoadToolIT {
             stmt.execute("CREATE TABLE " + tableName + "(ID INTEGER NOT NULL PRIMARY KEY, "
                     + "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)");
             
-            FileSystem fs = FileSystem.get(hbaseTestUtil.getConfiguration());
+            FileSystem fs = FileSystem.get(getUtility().getConfiguration());
             fs.create(new Path(outputPath));
             FSDataOutputStream outputStream = fs.create(new Path("/tmp/input9.csv"));
             PrintWriter printWriter = new PrintWriter(outputStream);
@@ -357,7 +328,7 @@ public class CsvBulkLoadToolIT {
             printWriter.close();
             
             CsvBulkLoadTool csvBulkLoadTool = new CsvBulkLoadTool();
-            csvBulkLoadTool.setConf(hbaseTestUtil.getConfiguration());
+            csvBulkLoadTool.setConf(getUtility().getConfiguration());
             csvBulkLoadTool.run(new String[] {
                 "--input", "/tmp/input9.csv",
                 "--output", outputPath,

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0122a54/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index 062b303..aba9c11 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -17,9 +17,9 @@
  */
 package org.apache.phoenix.end2end;
 
-import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
 import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.sql.Connection;
@@ -28,117 +28,114 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.phoenix.jdbc.PhoenixDriver;
 import org.apache.phoenix.mapreduce.index.IndexTool;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.util.MetaDataUtil;
-import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
-import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 /**
  * Tests for the {@link IndexTool}
  */
-@Category(NeedsOwnMiniClusterTest.class)
-public class IndexToolIT {
+@RunWith(Parameterized.class)
+public class IndexToolIT extends BaseOwnClusterHBaseManagedTimeIT {
     
-    private static HBaseTestingUtility hbaseTestUtil;
-    private static String zkQuorum;
-  
-    @BeforeClass
-    public static void setUp() throws Exception {
-        hbaseTestUtil = new HBaseTestingUtility();
-        Configuration conf = hbaseTestUtil.getConfiguration();
-        conf.setBoolean("hbase.defaults.for.version.skip", true);
-        // Since we're using the real PhoenixDriver in this test, remove the
-        // extra JDBC argument that causes the test driver to be used.
-        conf.set(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
-        setUpConfigForMiniCluster(conf);
-        hbaseTestUtil.startMiniCluster();
-        hbaseTestUtil.startMiniMapReduceCluster();
-        Class.forName(PhoenixDriver.class.getName());
-        DriverManager.registerDriver(PhoenixDriver.INSTANCE);
-        zkQuorum = "localhost:" + hbaseTestUtil.getZkCluster().getClientPort();
-    }
-    
-    @Test
-    public void testImmutableGlobalIndex() throws Exception {
-        testSecondaryIndex("SCHEMA", "DATA_TABLE1", true, false);
-    }
-    
-    @Test
-    public void testImmutableLocalIndex() throws Exception {
-        testSecondaryIndex("SCHEMA", "DATA_TABLE2", true, true);
-    }
-    
-    @Test
-    public void testMutableGlobalIndex() throws Exception {
-        testSecondaryIndex("SCHEMA", "DATA_TABLE3", false, false);
-    }
+    private final String schemaName;
+    private final String dataTable;
     
-    @Test
-    public void testMutableLocalIndex() throws Exception {
-        testSecondaryIndex("SCHEMA", "DATA_TABLE4", false, true);
-    }
+    private final boolean localIndex;
+    private final boolean transactional;
+    private final boolean directApi;
+    private final String tableDDLOptions;
     
-    @Test
-    public void testImmutableGlobalIndexDirectApi() throws Exception {
-    	testSecondaryIndex("SCHEMA", "DATA_TABLE5", true, false, true);
+    public IndexToolIT(boolean transactional, boolean localIndex, boolean mutable, boolean directApi) {
+        this.schemaName = "S";
+        this.dataTable = "T" + (transactional ? "_TXN" : "");
+        this.localIndex = localIndex;
+        this.transactional = transactional;
+        this.directApi = directApi;
+        StringBuilder optionBuilder = new StringBuilder();
+        if (!mutable) 
+            optionBuilder.append(" IMMUTABLE_ROWS=true ");
+        if (transactional) {
+            if (!(optionBuilder.length()==0))
+                optionBuilder.append(",");
+            optionBuilder.append(" TRANSACTIONAL=true ");
+        }
+        this.tableDDLOptions = optionBuilder.toString();
     }
     
-    @Test
-    public void testImmutableLocalIndexDirectApi() throws Exception {
-    	testSecondaryIndex("SCHEMA", "DATA_TABLE6", true, true, true);
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(1);
+        serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+        Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(1);
+        clientProps.put(QueryServices.TRANSACTIONS_ENABLED, "true");
+        setUpRealDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), new ReadOnlyProps(clientProps.entrySet().iterator()));
     }
     
-    @Test
-    public void testMutableGlobalIndexDirectApi() throws Exception {
-    	testSecondaryIndex("SCHEMA", "DATA_TABLE7", false, false, true);
+    @Parameters(name="transactional = {0} , mutable = {1} , localIndex = {2}, directApi = {3}")
+    public static Collection<Boolean[]> data() {
+        return Arrays.asList(new Boolean[][] {     
+                 { false, false, false, false }, { false, false, false, true }, { false, false, true, false }, { false, false, true, true }, 
+                 { false, true, false, false }, { false, true, false, true }, { false, true, true, false }, { false, true, true, true }, 
+                 { true, false, false, false }, { true, false, false, true }, { true, false, true, false }, { true, false, true, true }, 
+                 { true, true, false, false }, { true, true, false, true }, { true, true, true, false }, { true, true, true, true }
+           });
     }
     
     @Test
-    public void testMutableLocalIndexDirectApi() throws Exception {
-    	testSecondaryIndex("SCHEMA", "DATA_TABLE8", false, true, true);
-    }
-    
-    public void testSecondaryIndex(final String schemaName, final String dataTable, final boolean isImmutable , final boolean isLocal) throws Exception {
-    	testSecondaryIndex(schemaName, dataTable, isImmutable, isLocal, false);
-    }
-    
-    public void testSecondaryIndex(final String schemaName, final String dataTable, final boolean isImmutable , final boolean isLocal, final boolean directApi) throws Exception {
-        
-    	final String fullTableName = SchemaUtil.getTableName(schemaName, dataTable);
-        final String indxTable = String.format("%s_%s",dataTable,"INDX");
+    public void testSecondaryIndex() throws Exception {
+        final String fullTableName = SchemaUtil.getTableName(schemaName, dataTable);
+        final String indxTable = String.format("%s_%s", dataTable, "INDX");
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum,props);
+        props.setProperty(QueryServices.TRANSACTIONS_ENABLED, "true");
+        Connection conn = DriverManager.getConnection(getUrl(), props);
         Statement stmt = conn.createStatement();
         try {
         
-            stmt.execute(String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) %s", fullTableName, (isImmutable ? "IMMUTABLE_ROWS=true" :"")));
+            stmt.execute(String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER) %s", fullTableName, tableDDLOptions));
             String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", fullTableName);
             PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
             
-            int id = 1;
             // insert two rows
-            upsertRow(stmt1, id++);
-            upsertRow(stmt1, id++);
+            upsertRow(stmt1, 1);
+            upsertRow(stmt1, 2);
             conn.commit();
             
-            stmt.execute(String.format("CREATE %s INDEX %s ON %s  (LPAD(UPPER(NAME),8,'x')||'_xyz') ASYNC ", (isLocal ? "LOCAL" : ""), indxTable, fullTableName));
+            if (transactional) {
+                // insert two rows in another connection without committing so that they are not visible to other transactions
+                try (Connection conn2 = DriverManager.getConnection(getUrl(), props)) {
+                    PreparedStatement stmt2 = conn.prepareStatement(upsertQuery);
+                    upsertRow(stmt2, 5);
+                    upsertRow(stmt2, 6);
+                    ResultSet rs = conn.createStatement().executeQuery("SELECT count(*) from "+fullTableName);
+                    assertTrue(rs.next());
+                    assertEquals("Unexpected row count ", 4, rs.getInt(1));
+                    assertFalse(rs.next());
+                }
+            }
+            
+            stmt.execute(String.format("CREATE %s INDEX %s ON %s  (LPAD(UPPER(NAME),8,'x')||'_xyz') ASYNC ", (localIndex ? "LOCAL" : ""), indxTable, fullTableName));
    
             //verify rows are fetched from data table.
             String selectSql = String.format("SELECT LPAD(UPPER(NAME),8,'x')||'_xyz',ID FROM %s", fullTableName);
@@ -153,10 +150,12 @@ public class IndexToolIT {
             assertEquals("xxUNAME1_xyz", rs.getString(1));    
             assertTrue(rs.next());
             assertEquals("xxUNAME2_xyz", rs.getString(1));
+            assertFalse(rs.next());
+            conn.commit();
            
             //run the index MR job.
             final IndexTool indexingTool = new IndexTool();
-            indexingTool.setConf(new Configuration(hbaseTestUtil.getConfiguration()));
+            indexingTool.setConf(new Configuration(getUtility().getConfiguration()));
             
             final String[] cmdArgs = getArgValues(schemaName, dataTable, indxTable, directApi);
             int status = indexingTool.run(cmdArgs);
@@ -167,115 +166,39 @@ public class IndexToolIT {
             upsertRow(stmt1, 4);
             conn.commit();
             
-            rs = stmt1.executeQuery("SELECT * FROM "+SchemaUtil.getTableName(schemaName, indxTable));
+            rs = stmt1.executeQuery("SELECT LPAD(UPPER(NAME),8,'x')||'_xyz',ID FROM "+fullTableName);
 
             //assert we are pulling from index table.
             rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
             actualExplainPlan = QueryUtil.getExplainPlan(rs);
-            assertExplainPlan(actualExplainPlan,schemaName,dataTable,indxTable,isLocal);
+            assertExplainPlan(actualExplainPlan, schemaName, dataTable, indxTable, localIndex);
             
             rs = stmt.executeQuery(selectSql);
-//            assertTrue(rs.next());
-//            assertEquals("xxUNAME1_xyz", rs.getString(1));
-//            assertEquals(1, rs.getInt(2));
-//            
-//            assertTrue(rs.next());
-//            assertEquals("xxUNAME2_xyz", rs.getString(1));
-//            assertEquals(2, rs.getInt(2));
-//
-//            assertTrue(rs.next());
-//            assertEquals("xxUNAME3_xyz", rs.getString(1));
-//            assertEquals(3, rs.getInt(2));
-//            
-//            assertTrue(rs.next());
-//            assertEquals("xxUNAME4_xyz", rs.getString(1));
-//            assertEquals(4, rs.getInt(2));
-//      
-//            assertFalse(rs.next());
-            
-            conn.createStatement().execute(String.format("DROP INDEX  %s ON %s",indxTable , fullTableName));
-        } finally {
-            conn.close();
-        }
-    }
-    
-    
-    /**
-     * This test is to assert that updates that happen to rows of a mutable table after an index is created in ASYNC mode and before
-     * the MR job runs, do show up in the index table . 
-     * @throws Exception
-     */
-    @Test
-    public void testMutalbleIndexWithUpdates() throws Exception {
-        
-        final String dataTable = "DATA_TABLE5";
-        final String indxTable = String.format("%s_%s",dataTable,"INDX");
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = DriverManager.getConnection(PhoenixRuntime.JDBC_PROTOCOL + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum,props);
-        Statement stmt = conn.createStatement();
-        try {
-        
-            stmt.execute(String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER)",dataTable));
-            String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)",dataTable);
-            PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
-            
-            int id = 1;
-            // insert two rows
-            upsertRow(stmt1, id++);
-            upsertRow(stmt1, id++);
-            conn.commit();
-            
-            stmt.execute(String.format("CREATE INDEX %s ON %s (UPPER(NAME)) ASYNC ", indxTable,dataTable));
-            
-            //update a row 
-            stmt1.setInt(1, 1);
-            stmt1.setString(2, "uname" + String.valueOf(10));
-            stmt1.setInt(3, 95050 + 1);
-            stmt1.executeUpdate();
-            conn.commit();  
-            
-            //verify rows are fetched from data table.
-            String selectSql = String.format("SELECT UPPER(NAME),ID FROM %s",dataTable);
-            ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
-            String actualExplainPlan = QueryUtil.getExplainPlan(rs);
-            
-            //assert we are pulling from data table.
-            assertEquals(String.format("CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER %s",dataTable),actualExplainPlan);
+            assertTrue(rs.next());
+            assertEquals("xxUNAME1_xyz", rs.getString(1));
+            assertEquals(1, rs.getInt(2));
             
-            rs = stmt1.executeQuery(selectSql);
             assertTrue(rs.next());
-            assertEquals("UNAME10", rs.getString(1));
+            assertEquals("xxUNAME2_xyz", rs.getString(1));
+            assertEquals(2, rs.getInt(2));
+
             assertTrue(rs.next());
-            assertEquals("UNAME2", rs.getString(1));
-           
-            //run the index MR job.
-            final IndexTool indexingTool = new IndexTool();
-            indexingTool.setConf(new Configuration(hbaseTestUtil.getConfiguration()));
-            
-            final String[] cmdArgs = getArgValues(null, dataTable,indxTable);
-            int status = indexingTool.run(cmdArgs);
-            assertEquals(0, status);
-            
-            //assert we are pulling from index table.
-            rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
-            actualExplainPlan = QueryUtil.getExplainPlan(rs);
-            assertExplainPlan(actualExplainPlan,null,dataTable,indxTable,false);
+            assertEquals("xxUNAME3_xyz", rs.getString(1));
+            assertEquals(3, rs.getInt(2));
             
-            rs = stmt.executeQuery(selectSql);
             assertTrue(rs.next());
-            assertEquals("UNAME10", rs.getString(1));
-            assertEquals(1, rs.getInt(2));
+            assertEquals("xxUNAME4_xyz", rs.getString(1));
+            assertEquals(4, rs.getInt(2));
+      
+            assertFalse(rs.next());
             
-            assertTrue(rs.next());
-            assertEquals("UNAME2", rs.getString(1));
-            assertEquals(2, rs.getInt(2));
-            conn.createStatement().execute(String.format("DROP INDEX  %s ON %s",indxTable , dataTable));
+            conn.createStatement().execute(String.format("DROP INDEX  %s ON %s",indxTable , fullTableName));
         } finally {
             conn.close();
         }
     }
     
-    private void assertExplainPlan(final String actualExplainPlan, String schemaName, String dataTable,
+    public static void assertExplainPlan(final String actualExplainPlan, String schemaName, String dataTable,
             String indxTable, boolean isLocal) {
         
         String expectedExplainPlan = "";
@@ -290,11 +213,11 @@ public class IndexToolIT {
         assertEquals(expectedExplainPlan,actualExplainPlan);
     }
 
-    private String[] getArgValues(String schemaName, String dataTable, String indxTable) {
+    public static String[] getArgValues(String schemaName, String dataTable, String indxTable) {
         return getArgValues(schemaName, dataTable, indxTable, false);
     }
     
-    private String[] getArgValues(String schemaName, String dataTable, String indxTable, boolean directApi) {
+    public static String[] getArgValues(String schemaName, String dataTable, String indxTable, boolean directApi) {
         final List<String> args = Lists.newArrayList();
         if (schemaName!=null) {
             args.add("-s");
@@ -315,24 +238,12 @@ public class IndexToolIT {
         return args.toArray(new String[0]);
     }
 
-    private void upsertRow(PreparedStatement stmt, int i) throws SQLException {
+    public static void upsertRow(PreparedStatement stmt, int i) throws SQLException {
         // insert row
         stmt.setInt(1, i);
         stmt.setString(2, "uname" + String.valueOf(i));
         stmt.setInt(3, 95050 + i);
         stmt.executeUpdate();
     }
-
-    @AfterClass
-    public static void tearDownAfterClass() throws Exception {
-        try {
-            DriverManager.deregisterDriver(PhoenixDriver.INSTANCE);
-        } finally {
-            try {
-                hbaseTestUtil.shutdownMiniMapReduceCluster();
-            } finally {
-                hbaseTestUtil.shutdownMiniCluster();
-            }
-        }
-    }
+    
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0122a54/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutableIndexToolIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutableIndexToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutableIndexToolIT.java
new file mode 100644
index 0000000..0791479
--- /dev/null
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutableIndexToolIT.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.mapreduce.index.IndexTool;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class MutableIndexToolIT extends BaseOwnClusterHBaseManagedTimeIT {
+    
+    @BeforeClass
+    public static void doSetup() throws Exception {
+        Map<String, String> serverProps = Maps.newHashMapWithExpectedSize(1);
+        serverProps.put(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
+        setUpRealDriver(new ReadOnlyProps(serverProps.entrySet().iterator()), ReadOnlyProps.EMPTY_PROPS);
+    }
+
+    /**
+     * This test is to assert that updates that happen to rows of a mutable table after an index is created in ASYNC mode and before
+     * the MR job runs, do show up in the index table . 
+     * @throws Exception
+     */
+    @Test
+    public void testMutableIndexWithUpdates() throws Exception {
+        
+        final String dataTable = "DATA_TABLE5";
+        final String indxTable = String.format("%s_%s",dataTable,"INDX");
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        Statement stmt = conn.createStatement();
+        try {
+        
+            stmt.execute(String.format("CREATE TABLE %s (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, ZIP INTEGER)",dataTable));
+            String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)",dataTable);
+            PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
+            
+            int id = 1;
+            // insert two rows
+            IndexToolIT.upsertRow(stmt1, id++);
+            IndexToolIT.upsertRow(stmt1, id++);
+            conn.commit();
+            
+            stmt.execute(String.format("CREATE INDEX %s ON %s (UPPER(NAME)) ASYNC ", indxTable,dataTable));
+            
+            //update a row 
+            stmt1.setInt(1, 1);
+            stmt1.setString(2, "uname" + String.valueOf(10));
+            stmt1.setInt(3, 95050 + 1);
+            stmt1.executeUpdate();
+            conn.commit();  
+            
+            //verify rows are fetched from data table.
+            String selectSql = String.format("SELECT UPPER(NAME),ID FROM %s",dataTable);
+            ResultSet rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
+            String actualExplainPlan = QueryUtil.getExplainPlan(rs);
+            
+            //assert we are pulling from data table.
+            assertEquals(String.format("CLIENT 1-CHUNK PARALLEL 1-WAY FULL SCAN OVER %s",dataTable),actualExplainPlan);
+            
+            rs = stmt1.executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals("UNAME10", rs.getString(1));
+            assertTrue(rs.next());
+            assertEquals("UNAME2", rs.getString(1));
+           
+            //run the index MR job.
+            final IndexTool indexingTool = new IndexTool();
+            indexingTool.setConf(new Configuration(getUtility().getConfiguration()));
+            
+            final String[] cmdArgs = IndexToolIT.getArgValues(null, dataTable,indxTable);
+            int status = indexingTool.run(cmdArgs);
+            assertEquals(0, status);
+            
+            //assert we are pulling from index table.
+            rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
+            actualExplainPlan = QueryUtil.getExplainPlan(rs);
+            IndexToolIT.assertExplainPlan(actualExplainPlan,null,dataTable,indxTable,false);
+            
+            rs = stmt.executeQuery(selectSql);
+            assertTrue(rs.next());
+            assertEquals("UNAME10", rs.getString(1));
+            assertEquals(1, rs.getInt(2));
+            
+            assertTrue(rs.next());
+            assertEquals("UNAME2", rs.getString(1));
+            assertEquals(2, rs.getInt(2));
+            conn.createStatement().execute(String.format("DROP INDEX  %s ON %s",indxTable , dataTable));
+        } finally {
+            conn.close();
+        }
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0122a54/phoenix-core/src/it/java/org/apache/phoenix/end2end/UserDefinedFunctionsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UserDefinedFunctionsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UserDefinedFunctionsIT.java
index 4d1c5e4..4eee422 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UserDefinedFunctionsIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UserDefinedFunctionsIT.java
@@ -30,12 +30,9 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import java.io.BufferedInputStream;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
@@ -54,11 +51,8 @@ import javax.tools.JavaCompiler;
 import javax.tools.ToolProvider;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.phoenix.expression.function.UDFExpression;
 import org.apache.phoenix.jdbc.PhoenixTestDriver;
 import org.apache.phoenix.query.QueryServices;
@@ -259,7 +253,7 @@ public class UserDefinedFunctionsIT extends BaseOwnClusterIT{
         Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
         props.put(QueryServices.ALLOW_USER_DEFINED_FUNCTIONS_ATTRIB, "true");
         props.put(QueryServices.DYNAMIC_JARS_DIR_KEY,string+"/hbase/tmpjars/");
-        driver = initAndRegisterDriver(url, new ReadOnlyProps(props.entrySet().iterator()));
+        driver = initAndRegisterTestDriver(url, new ReadOnlyProps(props.entrySet().iterator()));
         compileTestClass(MY_REVERSE_CLASS_NAME, MY_REVERSE_PROGRAM, 1);
         compileTestClass(MY_SUM_CLASS_NAME, MY_SUM_PROGRAM, 2);
         compileTestClass(MY_ARRAY_INDEX_CLASS_NAME, MY_ARRAY_INDEX_PROGRAM, 3);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0122a54/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropIndexDuringUpsertIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropIndexDuringUpsertIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropIndexDuringUpsertIT.java
index df28e65..acc089b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropIndexDuringUpsertIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/DropIndexDuringUpsertIT.java
@@ -91,7 +91,7 @@ public abstract class DropIndexDuringUpsertIT extends BaseTest {
         Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
         // Must update config before starting server
         props.put(QueryServices.DROP_METADATA_ATTRIB, Boolean.toString(true));
-        driver = initAndRegisterDriver(url, new ReadOnlyProps(props.entrySet().iterator()));
+        driver = initAndRegisterTestDriver(url, new ReadOnlyProps(props.entrySet().iterator()));
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0122a54/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexReplicationIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexReplicationIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexReplicationIT.java
index a4f4682..2568566 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexReplicationIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexReplicationIT.java
@@ -180,7 +180,7 @@ public class MutableIndexReplicationIT extends BaseTest {
         // Must update config before starting server
         URL = getLocalClusterUrl(utility1);
         LOG.info("Connecting driver to "+URL);
-        driver = initAndRegisterDriver(URL, new ReadOnlyProps(props.entrySet().iterator()));
+        driver = initAndRegisterTestDriver(URL, new ReadOnlyProps(props.entrySet().iterator()));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0122a54/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/EndToEndCoveredIndexingIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/EndToEndCoveredIndexingIT.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/EndToEndCoveredIndexingIT.java
index 01812f3..5102dc8 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/EndToEndCoveredIndexingIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/EndToEndCoveredIndexingIT.java
@@ -17,8 +17,7 @@
  */
 package org.apache.phoenix.hbase.index.covered.example;
 
-
-import static org.apache.phoenix.query.BaseTest.initAndRegisterDriver;
+import static org.apache.phoenix.query.BaseTest.initAndRegisterTestDriver;
 import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
 
 import java.io.IOException;
@@ -123,7 +122,7 @@ public class EndToEndCoveredIndexingIT {
                 PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkQuorum + 
                 PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR + 
                 conf.get(QueryServices.EXTRA_JDBC_ARGUMENTS_ATTRIB, QueryServicesOptions.DEFAULT_EXTRA_JDBC_ARGUMENTS);
-        initAndRegisterDriver(url, ReadOnlyProps.EMPTY_PROPS);
+        initAndRegisterTestDriver(url, ReadOnlyProps.EMPTY_PROPS);
     }
 
     @BeforeClass

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0122a54/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index c95cd5d..a363459 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
@@ -95,6 +96,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
     public static final String SCAN_REGION_SERVER = "_SCAN_REGION_SERVER";
     public static final String RUN_UPDATE_STATS_ASYNC_ATTRIB = "_RunUpdateStatsAsync";
     public static final String SKIP_REGION_BOUNDARY_CHECK = "_SKIP_REGION_BOUNDARY_CHECK";
+    public static final String TX_SCN = "_TxScn";
     
     /**
      * Attribute name used to pass custom annotations in Scans and Mutations (later). Custom annotations
@@ -154,6 +156,11 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver {
     @Override
     public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
         final Scan scan, final RegionScanner s) throws IOException {
+        byte[] txnScn = scan.getAttribute(TX_SCN);
+        if (txnScn!=null) {
+            TimeRange timeRange = scan.getTimeRange();
+            scan.setTimeRange(timeRange.getMin(), Bytes.toLong(txnScn));
+        }
         if (isRegionObserverFor(scan)) {
             if (! skipRegionBoundaryCheck(scan)) {
                 throwIfScanOutOfRegion(scan, c.getEnvironment().getRegion());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0122a54/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
index 8ee1634..5882c14 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
@@ -37,11 +37,13 @@ import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.db.DBWritable;
 import org.apache.phoenix.compile.QueryPlan;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.mapreduce.util.ConnectionUtil;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
 import org.apache.phoenix.query.KeyRange;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PhoenixRuntime;
 
 import com.google.common.base.Preconditions;
@@ -104,11 +106,13 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr
             throws IOException {
         Preconditions.checkNotNull(context);
         try {
+            final String txnScnValue = configuration.get(PhoenixConfigurationUtil.TX_SCN_VALUE);
             final String currentScnValue = configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
             final Properties overridingProps = new Properties();
-            if(currentScnValue != null) {
+            if(txnScnValue==null && currentScnValue!=null) {
                 overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, currentScnValue);
             }
+            overridingProps.put(QueryServices.TRANSACTIONS_ENABLED, "true");
             final Connection connection = ConnectionUtil.getInputConnection(configuration, overridingProps);
             final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration);
             Preconditions.checkNotNull(selectStatement);
@@ -116,6 +120,11 @@ public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWr
             final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
             // Optimize the query plan so that we potentially use secondary indexes            
             final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement);
+            final Scan scan = queryPlan.getContext().getScan();
+            // since we can't set a scn on connections with txn set TX_SCN attribute so that the max time range is set by BaseScannerRegionObserver 
+            if (txnScnValue!=null) {
+                scan.setAttribute(BaseScannerRegionObserver.TX_SCN, Bytes.toBytes(Long.valueOf(txnScnValue)));
+            }
             // Initialize the query plan so it sets up the parallel scans
             queryPlan.iterator(MapReduceParallelScanGrouper.getInstance());
             return queryPlan;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0122a54/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
index 8a4f963..f5117fc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/IndexTool.java
@@ -65,6 +65,7 @@ import org.apache.phoenix.util.MetaDataUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
+import org.apache.phoenix.util.TransactionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -191,9 +192,13 @@ public class IndexTool extends Configured implements Tool {
             final PTable pindexTable = PhoenixRuntime.getTable(connection, qIndexTable);
 
             // this is set to ensure index tables remains consistent post population.
-            long indxTimestamp = pindexTable.getTimeStamp();
+            long maxTimeRange = pindexTable.getTimeStamp()+1;
+            if (pdataTable.isTransactional()) {
+                configuration.set(PhoenixConfigurationUtil.TX_SCN_VALUE,
+                    Long.toString(TransactionUtil.convertToNanoseconds(maxTimeRange)));
+            }
             configuration.set(PhoenixConfigurationUtil.CURRENT_SCN_VALUE,
-                Long.toString(indxTimestamp + 1));
+                Long.toString(maxTimeRange));
 
             // check if the index type is LOCAL, if so, derive and set the physicalIndexName that is
             // computed from the qDataTable name.

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0122a54/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
index 3bc3808..9c64efc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportDirectMapper.java
@@ -80,8 +80,11 @@ public class PhoenixIndexImportDirectMapper extends
             indxWritable.setColumnMetadata(indxTblColumnMetadata);
 
             final Properties overrideProps = new Properties();
-            overrideProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB,
-                configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE));
+            String scn = configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
+            String txScnValue = configuration.get(PhoenixConfigurationUtil.TX_SCN_VALUE);
+            if(txScnValue==null) {
+                overrideProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn);
+            }
             connection = ConnectionUtil.getOutputConnection(configuration, overrideProps);
             connection.setAutoCommit(false);
             // Get BatchSize

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0122a54/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
index 517ce91..093b93d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/index/PhoenixIndexImportMapper.java
@@ -72,7 +72,11 @@ public class PhoenixIndexImportMapper extends Mapper<NullWritable, PhoenixIndexD
             preUpdateProcessor = PhoenixConfigurationUtil.loadPreUpsertProcessor(configuration);
             indexTableName = PhoenixConfigurationUtil.getPhysicalTableName(configuration);
             final Properties overrideProps = new Properties ();
-            overrideProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE));
+            String scn = configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
+            String txScnValue = configuration.get(PhoenixConfigurationUtil.TX_SCN_VALUE);
+            if(txScnValue==null) {
+                overrideProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, scn);
+            }
             connection = ConnectionUtil.getOutputConnection(configuration,overrideProps);
             connection.setAutoCommit(false);
             final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0122a54/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index 9e29fba..280daa2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -17,6 +17,8 @@
  */
 package org.apache.phoenix.mapreduce.util;
 
+import static org.apache.commons.lang.StringUtils.isNotEmpty;
+
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.SQLException;
@@ -45,8 +47,6 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
-import static org.apache.commons.lang.StringUtils.isNotEmpty;
-
 /**
  * A utility class to set properties on the {#link Configuration} instance.
  * Used as part of Map Reduce job configuration.
@@ -87,6 +87,8 @@ public final class PhoenixConfigurationUtil {
     
     public static final String CURRENT_SCN_VALUE = "phoenix.mr.currentscn.value";
     
+    public static final String TX_SCN_VALUE = "phoenix.mr.txscn.value";
+    
     /** Configuration key for the class name of an ImportPreUpsertKeyValueProcessor */
     public static final String UPSERT_HOOK_CLASS_CONFKEY = "phoenix.mapreduce.import.kvprocessor";
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0122a54/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index e8d995c..ac2062a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1013,7 +1013,6 @@ public class MetaDataClient {
     private MutationState buildIndex(PTable index, TableRef dataTableRef) throws SQLException {
         AlterIndexStatement indexStatement = null;
         boolean wasAutoCommit = connection.getAutoCommit();
-        connection.rollback();
         try {
             connection.setAutoCommit(true);
             MutationPlan mutationPlan;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b0122a54/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
index 951bfce..a67a530 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/query/BaseTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.phoenix.query;
 
-import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster;
 import static org.apache.phoenix.util.PhoenixRuntime.CURRENT_SCN_ATTRIB;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL;
 import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR;
@@ -119,12 +118,6 @@ import java.util.concurrent.TimeoutException;
 
 import javax.annotation.Nonnull;
 
-import co.cask.tephra.TransactionManager;
-import co.cask.tephra.TxConstants;
-import co.cask.tephra.distributed.TransactionService;
-import co.cask.tephra.metrics.TxMetricsCollector;
-import co.cask.tephra.persist.InMemoryTransactionStateStorage;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -179,6 +172,12 @@ import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import co.cask.tephra.TransactionManager;
+import co.cask.tephra.TxConstants;
+import co.cask.tephra.distributed.TransactionService;
+import co.cask.tephra.metrics.TxMetricsCollector;
+import co.cask.tephra.persist.InMemoryTransactionStateStorage;
+
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -508,6 +507,7 @@ public abstract class BaseTest {
     
     protected static String url;
     protected static PhoenixTestDriver driver;
+    protected static PhoenixDriver realDriver;
     protected static boolean clusterInitialized = false;
     private static HBaseTestingUtility utility;
     protected static final Configuration config = HBaseConfiguration.create(); 
@@ -588,9 +588,16 @@ public abstract class BaseTest {
                 assertTrue(destroyDriver(driver));
             } finally {
                 driver = null;
-                teardownTxManager();
             }
         }
+        if (realDriver != null) {
+            try {
+                assertTrue(destroyDriver(realDriver));
+            } finally {
+                realDriver = null;
+            }
+        }
+        teardownTxManager();
     }
     
     protected static void dropNonSystemTables() throws Exception {
@@ -607,7 +614,11 @@ public abstract class BaseTest {
         } finally {
             try {
                 if (utility != null) {
-                    utility.shutdownMiniCluster();
+                    try {
+                        utility.shutdownMiniMapReduceCluster();
+                    } finally {
+                        utility.shutdownMiniCluster();
+                    }
                 }
             } finally {
                 utility = null;
@@ -623,12 +634,33 @@ public abstract class BaseTest {
     protected static void setUpTestDriver(ReadOnlyProps serverProps, ReadOnlyProps clientProps) throws Exception {
         String url = checkClusterInitialized(serverProps);
         if (driver == null) {
-            driver = initAndRegisterDriver(url, clientProps);
+            driver = initAndRegisterTestDriver(url, clientProps);
             if (clientProps.getBoolean(QueryServices.TRANSACTIONS_ENABLED, QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED)) {
                 setupTxManager();
             }
         }
     }
+    
+    protected static void setUpRealDriver(ReadOnlyProps serverProps, ReadOnlyProps clientProps) throws Exception {
+        if (!clusterInitialized) {
+            setUpConfigForMiniCluster(config, serverProps);
+            utility = new HBaseTestingUtility(config);
+            try {
+                utility.startMiniCluster(NUM_SLAVES_BASE);
+                utility.startMiniMapReduceCluster();
+                url = QueryUtil.getConnectionUrl(new Properties(), utility.getConfiguration());
+            } catch (Throwable t) {
+                throw new RuntimeException(t);
+            }
+            clusterInitialized = true;
+        }
+        Class.forName(PhoenixDriver.class.getName());
+        realDriver = PhoenixDriver.INSTANCE;
+        DriverManager.registerDriver(realDriver);
+        if (clientProps.getBoolean(QueryServices.TRANSACTIONS_ENABLED, QueryServicesOptions.DEFAULT_TRANSACTIONS_ENABLED)) {
+            setupTxManager();
+        }
+    }
 
     private static boolean isDistributedClusterModeEnabled(Configuration conf) {
         boolean isDistributedCluster = false;
@@ -739,7 +771,7 @@ public abstract class BaseTest {
      * Create a {@link PhoenixTestDriver} and register it.
      * @return an initialized and registered {@link PhoenixTestDriver} 
      */
-    public static PhoenixTestDriver initAndRegisterDriver(String url, ReadOnlyProps props) throws Exception {
+    public static PhoenixTestDriver initAndRegisterTestDriver(String url, ReadOnlyProps props) throws Exception {
         PhoenixTestDriver newDriver = new PhoenixTestDriver(props);
         DriverManager.registerDriver(newDriver);
         Driver oldDriver = DriverManager.getDriver(url); 
@@ -1804,7 +1836,7 @@ public abstract class BaseTest {
         assertEquals(expectedCount, count);
     }
     
-    public HBaseTestingUtility getUtility() {
+    public static HBaseTestingUtility getUtility() {
         return utility;
     }