You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ma...@apache.org on 2015/12/11 03:43:47 UTC

[09/52] [abbrv] phoenix git commit: PHOENIX-1674 Snapshot isolation transaction support through Tephra (James Taylor, Thomas D'Silva)

PHOENIX-1674 Snapshot isolation transaction support through Tephra (James Taylor, Thomas D'Silva)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/cc9929b5
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/cc9929b5
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/cc9929b5

Branch: refs/heads/calcite
Commit: cc9929b5c54eb979294df70929aa6eb781748b46
Parents: 76a8d18
Author: Thomas D'Silva <td...@salesforce.com>
Authored: Mon Nov 23 21:24:01 2015 -0800
Committer: Thomas D'Silva <td...@salesforce.com>
Committed: Mon Nov 23 21:24:01 2015 -0800

----------------------------------------------------------------------
 bin/tephra                                      | 310 ++++++
 bin/tephra-env.sh                               |  55 ++
 phoenix-assembly/src/build/client.xml           |   4 +
 phoenix-assembly/src/build/server-with-txns.xml |  48 +
 .../src/build/server-without-antlr.xml          |  46 +
 phoenix-assembly/src/build/server.xml           |  10 +-
 phoenix-core/pom.xml                            |  24 +
 .../ConnectionQueryServicesTestImpl.java        |   4 +-
 .../StatsCollectorWithSplitsAndMultiCFIT.java   |  10 +-
 .../phoenix/end2end/index/ImmutableIndexIT.java |  24 +-
 .../end2end/index/IndexExpressionIT.java        |   5 +-
 .../apache/phoenix/end2end/index/IndexIT.java   | 112 +--
 .../end2end/index/MutableIndexFailureIT.java    | 490 ++++++----
 .../phoenix/end2end/index/MutableIndexIT.java   |  68 +-
 .../end2end/index/txn/MutableRollbackIT.java    | 510 ++++++++++
 .../phoenix/end2end/index/txn/RollbackIT.java   | 167 ++++
 .../end2end/index/txn/TxWriteFailureIT.java     | 214 +++++
 .../apache/phoenix/execute/PartialCommitIT.java | 149 +--
 .../org/apache/phoenix/rpc/UpdateCacheIT.java   | 108 +--
 .../phoenix/rpc/UpdateCacheWithScnIT.java       |  41 +
 .../org/apache/phoenix/tx/TransactionIT.java    | 537 +++++++++++
 .../org/apache/phoenix/tx/TxCheckpointIT.java   | 378 ++++++++
 .../apache/phoenix/tx/TxPointInTimeQueryIT.java |  62 ++
 .../phoenix/cache/IndexMetaDataCache.java       |   9 +
 .../apache/phoenix/cache/ServerCacheClient.java |   4 +-
 .../org/apache/phoenix/cache/TenantCache.java   |   2 +-
 .../apache/phoenix/cache/TenantCacheImpl.java   |  13 +-
 .../phoenix/compile/CreateTableCompiler.java    |   4 +-
 .../apache/phoenix/compile/DeleteCompiler.java  |  16 +-
 .../apache/phoenix/compile/FromCompiler.java    |   9 +-
 .../apache/phoenix/compile/JoinCompiler.java    |   2 +-
 .../apache/phoenix/compile/PostDDLCompiler.java |  10 +-
 .../phoenix/compile/StatementContext.java       |   3 +-
 .../compile/TupleProjectionCompiler.java        |   4 +-
 .../apache/phoenix/compile/UnionCompiler.java   |   2 +-
 .../apache/phoenix/compile/UpsertCompiler.java  |  13 +-
 .../coprocessor/BaseScannerRegionObserver.java  |   9 +-
 .../coprocessor/MetaDataEndpointImpl.java       |  21 +-
 .../phoenix/coprocessor/MetaDataProtocol.java   |   6 +-
 .../phoenix/coprocessor/ScanRegionObserver.java |  12 +-
 .../coprocessor/ServerCachingEndpointImpl.java  |   8 +-
 .../coprocessor/ServerCachingProtocol.java      |   5 +-
 .../UngroupedAggregateRegionObserver.java       |   6 +
 .../coprocessor/generated/PTableProtos.java     | 103 +-
 .../generated/ServerCachingProtos.java          | 120 ++-
 .../phoenix/exception/SQLExceptionCode.java     |  17 +
 .../apache/phoenix/execute/BaseQueryPlan.java   |  40 +-
 .../apache/phoenix/execute/DelegateHTable.java  | 300 ++++++
 .../apache/phoenix/execute/HashJoinPlan.java    |  27 +-
 .../apache/phoenix/execute/MutationState.java   | 932 ++++++++++++++-----
 .../org/apache/phoenix/execute/ScanPlan.java    |   2 +-
 .../apache/phoenix/index/IndexMaintainer.java   |   4 +-
 .../phoenix/index/IndexMetaDataCacheClient.java |  12 +-
 .../index/IndexMetaDataCacheFactory.java        |  19 +-
 .../index/PhoenixIndexFailurePolicy.java        |   2 +-
 .../phoenix/index/PhoenixIndexMetaData.java     |  13 +
 .../index/PhoenixTransactionalIndexer.java      | 505 ++++++++++
 .../phoenix/iterate/BaseResultIterators.java    |   5 +
 .../phoenix/iterate/ChunkedResultIterator.java  |  25 +-
 .../phoenix/iterate/ParallelIterators.java      |   2 +-
 .../apache/phoenix/iterate/SerialIterators.java |   2 +-
 .../phoenix/iterate/TableResultIterator.java    |  28 +-
 .../apache/phoenix/jdbc/PhoenixConnection.java  |  77 +-
 .../phoenix/jdbc/PhoenixDatabaseMetaData.java   |   3 +
 .../org/apache/phoenix/jdbc/PhoenixDriver.java  |   2 +-
 .../phoenix/jdbc/PhoenixEmbeddedDriver.java     |   4 +
 .../apache/phoenix/jdbc/PhoenixStatement.java   |  15 +-
 .../apache/phoenix/join/HashCacheClient.java    |   3 +-
 .../apache/phoenix/join/HashCacheFactory.java   |   2 +-
 .../phoenix/mapreduce/PhoenixRecordReader.java  |   2 +-
 .../index/PhoenixIndexImportDirectMapper.java   |   2 +-
 .../apache/phoenix/optimize/QueryOptimizer.java |  15 +-
 .../phoenix/query/ConnectionQueryServices.java  |   6 +-
 .../query/ConnectionQueryServicesImpl.java      | 621 +++++++++---
 .../query/ConnectionlessQueryServicesImpl.java  |  58 +-
 .../query/DelegateConnectionQueryServices.java  |  27 +-
 .../apache/phoenix/query/MetaDataMutated.java   |   7 +-
 .../apache/phoenix/query/QueryConstants.java    |  23 +-
 .../org/apache/phoenix/query/QueryServices.java |   3 +
 .../phoenix/query/QueryServicesOptions.java     |   5 +-
 .../apache/phoenix/schema/DelegateTable.java    |   8 +-
 .../apache/phoenix/schema/MetaDataClient.java   | 376 +++++---
 .../org/apache/phoenix/schema/PMetaData.java    |   4 +-
 .../apache/phoenix/schema/PMetaDataImpl.java    | 107 +--
 .../java/org/apache/phoenix/schema/PTable.java  |   3 +-
 .../org/apache/phoenix/schema/PTableImpl.java   |  73 +-
 .../org/apache/phoenix/schema/PTableRef.java    |  65 ++
 .../apache/phoenix/schema/TableProperty.java    |   5 +-
 .../org/apache/phoenix/schema/TableRef.java     |  12 +-
 .../phoenix/trace/PhoenixMetricsSink.java       |   9 +-
 .../java/org/apache/phoenix/util/IndexUtil.java |  29 +-
 .../org/apache/phoenix/util/MetaDataUtil.java   |   7 +
 .../java/org/apache/phoenix/util/ScanUtil.java  |   2 +-
 .../org/apache/phoenix/util/SchemaUtil.java     |  29 +-
 .../org/apache/phoenix/util/ServerUtil.java     |   5 +-
 .../apache/phoenix/util/TransactionUtil.java    | 105 +++
 .../phoenix/compile/QueryCompilerTest.java      |   2 +-
 .../phoenix/compile/ViewCompilerTest.java       |  29 +-
 .../phoenix/execute/CorrelatePlanTest.java      |   2 +-
 .../phoenix/filter/SkipScanBigFilterTest.java   |   3 +-
 .../covered/CoveredIndexCodecForTesting.java    |  67 +-
 .../phoenix/index/IndexMaintainerTest.java      |   2 +-
 .../apache/phoenix/jdbc/PhoenixTestDriver.java  |   4 +-
 .../java/org/apache/phoenix/query/BaseTest.java | 130 ++-
 .../phoenix/query/ConnectionlessTest.java       |   5 +-
 .../phoenix/schema/PMetaDataImplTest.java       |   4 +-
 .../java/org/apache/phoenix/util/TestUtil.java  |  38 +-
 phoenix-flume/pom.xml                           |   7 +
 phoenix-pig/pom.xml                             |   7 +
 phoenix-protocol/src/main/PTable.proto          |   1 +
 .../src/main/ServerCachingService.proto         |   1 +
 phoenix-server/pom.xml                          |   7 +
 pom.xml                                         |   5 +-
 113 files changed, 6349 insertions(+), 1371 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/bin/tephra
----------------------------------------------------------------------
diff --git a/bin/tephra b/bin/tephra
new file mode 100644
index 0000000..54fe301
--- /dev/null
+++ b/bin/tephra
@@ -0,0 +1,310 @@
+#!/bin/sh
+
+#
+# Copyright © 2014 Cask Data, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+#
+
+# Source function library. used for "status" use case
+if [ -f "/etc/rc.d/init.d/functions" ]; then
+  PLATFORM="RHEL"
+  . /etc/rc.d/init.d/functions
+elif [ -f /lib/lsb/init-functions ] ; then
+  PLATFORM="UBUNTU"
+  . /lib/lsb/init-functions
+else
+  PLATFORM="UNSUPPORTED"
+fi
+
+# Attempt to set APP_HOME
+# Resolve links: $0 may be a link
+PRG="$0"
+bin=`dirname "${BASH_SOURCE-$0}"`
+bin=`cd "$bin"; pwd`
+lib="$bin"/../lib
+conf="$bin"/../conf
+script=`basename $0`
+
+# Resolve relative symlinks
+while [ -h "$PRG" ] ; do
+    ls=`ls -ld "$PRG"`
+    link=`expr "$ls" : '.*-> \(.*\)$'`
+    if expr "$link" : '/.*' > /dev/null; then
+        PRG="$link"
+    else
+        PRG=`dirname "$PRG"`"/$link"
+    fi
+done
+SAVED="`pwd`"
+cd "`dirname \"$PRG\"`/.."
+APP_HOME="`pwd -P`"
+APP=`basename $0`
+
+# Load component common environment file too
+. $bin/tephra-env.sh
+
+pid=$PID_DIR/tephra-service-${IDENT_STRING}.pid
+
+# In other environment, the jars are expected to be in <HOME>/lib directory.
+# Load all the jar files. Not ideal, but we need to load only the things that
+# is needed by this script.
+if [ "$CLASSPATH" = "" ]; then
+  CLASSPATH=${lib}/*
+else
+  CLASSPATH=$CLASSPATH:${lib}/*
+fi
+
+# Load the configuration too.
+if [ -d "$conf" ]; then
+  CLASSPATH=$CLASSPATH:"$conf"/
+fi
+
+# Set Log location
+if [ ! -e $LOG_DIR ]; then
+  mkdir -p $LOG_DIR;
+fi
+export LOG_PREFIX="tephra-service-$IDENT_STRING-$HOSTNAME"
+export LOGFILE=$LOG_PREFIX.log
+loglog="${LOG_DIR}/${LOGFILE}"
+
+# set the classpath to include hadoop and hbase dependencies
+set_classpath()
+{
+  COMP_HOME=$1
+  if [ -n "$HBASE_HOME" ]; then
+    HBASE_CP=`$HBASE_HOME/bin/hbase classpath`
+  elif [ `which hbase` ]; then
+    HBASE_CP=`hbase classpath`
+  fi
+
+  export HBASE_CP
+
+  if [ -n "$HBASE_CP" ]; then
+    CP=$COMP_HOME/phoenix-assembly/target/*:$HBASE_CP:$EXTRA_CLASSPATH
+  else
+    # assume Hadoop/HBase libs are included via EXTRA_CLASSPATH
+    echo "WARN: could not find Hadoop and HBase libraries"
+    CP=$COMP_HOME/phoenix-assembly/target/*:$EXTRA_CLASSPATH
+  fi
+
+  # Setup classpaths.
+  if [ -n "$CLASSPATH" ]; then
+    CLASSPATH=$CLASSPATH:$CP
+  else
+    CLASSPATH=$CP
+  fi
+
+  export CLASSPATH
+}
+
+# Attempts to find JAVA in few ways.
+set_java ()
+{
+  # Determine the Java command to use to start the JVM.
+  if [ -n "$JAVA_HOME" ] ; then
+    if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+        # IBM's JDK on AIX uses strange locations for the executables
+        export JAVA="$JAVA_HOME/jre/sh/java"
+    else
+        export JAVA="$JAVA_HOME/bin/java"
+    fi
+    if [ ! -x "$JAVA" ] ; then
+        echo "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME
+
+Please set the JAVA_HOME variable in your environment to match the
+location of your Java installation." >&2
+        exit 1
+    fi
+  else
+    export JAVA="java"
+    which java >/dev/null 2>&1 || { echo "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
+Please set the JAVA_HOME variable in your environment to match the
+location of your Java installation." >&2 ; exit 1; }
+  fi
+}
+
+# checks if there exists a PID that is already running. return 0 idempotently
+check_before_start()
+{
+  if [ ! -d "$PID_DIR" ]; then
+    mkdir -p "$PID_DIR"
+  fi
+  if [ -f $pid ]; then
+    if kill -0 `cat $pid` > /dev/null 2>&1; then
+      #echo "$APP $SERVICE running as process `cat $pid`. Stop it first."
+      echo "$APP running as process `cat $pid`. Stop it first."
+      exit 0
+    fi
+  fi
+}
+
+# Set Niceness
+if [ "$NICENESS" = "" ]; then
+ export NICENESS=0
+fi
+
+start() {
+  # Setup classpaths.
+  set_classpath $APP_HOME
+
+  # sets the JAVA variable.
+  set_java
+
+  check_before_start
+
+  echo "`date` Starting $APP service on `hostname`"
+  echo "`date` Starting $APP service on `hostname`" >> $loglog
+  echo "`ulimit -a`" >> $loglog 2>&1
+
+  export MAIN_CLASS="co.cask.tephra.TransactionServiceMain"
+  echo "Running class $MAIN_CLASS"
+  echo "Command: " "$JAVA" $OPTS -cp $CLASSPATH $JAVA_HEAPMAX $MAIN_CLASS >>$loglog
+  nohup nice -n $NICENESS "$JAVA" $OPTS -cp $CLASSPATH $JAVA_HEAPMAX ${MAIN_CLASS} </dev/null >>$loglog 2>&1 &
+  echo $! >$pid
+}
+
+stop() {
+  if [ -f $pid ]; then
+    pidToKill=`cat $pid`
+    # kill -0 == see if the PID exists
+    if kill -0 $pidToKill > /dev/null 2>&1; then
+      echo -n stopping $command
+      echo "`date` Terminating $command" >> $loglog
+      kill $pidToKill > /dev/null 2>&1
+      while kill -0 $pidToKill > /dev/null 2>&1;
+      do
+        echo -n "."
+        sleep 1;
+      done
+      rm $pid
+      echo
+    else
+      retval=$?
+      echo nothing to stop because kill -0 of pid $pidToKill failed with status $retval
+    fi
+    rm -f $pid
+  else
+    echo nothing to stop because no pid file $pid
+  fi
+}
+
+restart() {
+    stop
+    start
+}
+
+condrestart(){
+    case "$PLATFORM" in
+      "RHEL")
+        rh_status > /dev/null 2>&1
+        retval=$?
+      ;;
+      "UBUNTU")
+        ub_status > /dev/null 2>&1
+        retval=$?
+      ;;
+      "UNSUPPORTED")
+        echo "condrestart is not supported on platform"
+        exit 1
+      ;;
+    esac
+
+  if [[ $retval -eq 0 ]]; then
+    restart
+  fi
+}
+
+rh_status() {
+    echo "checking status"
+    # call sourced status function
+    status -p $pid 
+}
+
+ub_status() {
+    echo "checking status"
+    # call sourced status function
+    status_of_proc -p $pid "$0" "$APP"
+}
+
+# Executes a specific class' main method with the classpath and environment setup
+run() {
+    classname=$1
+    shift
+    if [ -z "$classname" ]; then
+        echo "ERROR: No classname was given!"
+        echo "Usage: $0 run <fully qualified classname> [arguments]"
+        exit 1
+    fi
+    # Setup classpaths.
+    set_classpath $APP_HOME
+
+    # sets the JAVA variable.
+    set_java
+
+    echo "Running class $classname"
+    "$JAVA" $OPTS -cp $CLASSPATH $JAVA_HEAPMAX $classname $@
+}
+
+case "$1" in
+  start)
+    $1  
+  ;;
+  
+  stop)
+    $1
+  ;;
+
+  restart)
+    $1
+  ;;
+
+  condrestart)
+    $1
+  ;;
+
+  status)
+    case "$PLATFORM" in
+      "RHEL")
+        rh_status
+      ;;
+      "UBUNTU")
+        ub_status
+      ;;
+      "UNSUPPORTED")
+        echo "status is not supported on platform"
+        exit 1
+      ;;
+    esac 
+  ;;
+
+  classpath)
+    set_classpath $APP_HOME
+    set_java
+    echo $CLASSPATH
+  ;;
+
+  run)
+    shift
+    run $@
+  ;;
+
+  *)
+    echo "Usage: $0 {start|stop|restart|status|run}"
+    exit 1
+  ;;
+
+
+esac
+exit $? 
+

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/bin/tephra-env.sh
----------------------------------------------------------------------
diff --git a/bin/tephra-env.sh b/bin/tephra-env.sh
new file mode 100644
index 0000000..6906b44
--- /dev/null
+++ b/bin/tephra-env.sh
@@ -0,0 +1,55 @@
+#!/bin/sh
+
+#
+# Copyright © 2014 Cask Data, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+#
+
+#
+# Common environment settings for Tephra.
+# Uncomment the lines below, where needed, and modify to adapt to your environment.
+#
+
+# A string representing this instance of the Tephra server. $USER by default.
+export IDENT_STRING=$USER
+
+# Where log files are stored.  /var/log by default.
+export LOG_DIR=/tmp/tephra-$IDENT_STRING
+
+# The directory where pid files are stored. /var/run by default.
+export PID_DIR=/tmp
+
+# Add any extra classes to the classpath
+# export EXTRA_CLASSPATH
+
+# Set the JVM heap size
+# export JAVA_HEAPMAX=-Xmx2048m
+
+# Additional runtime options
+#
+# GC logging options.
+# Uncomment the following two lines, making any desired changes, to enable GC logging output
+# export GC_LOG_OPTS="-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -Xloggc:server-gc.log -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=2 -XX:GCLogFileSize=50M"
+# export OPTS="$OPTS $GC_LOG_OPTS"
+#
+# JMX options.
+# Uncomment the following two lines, making any desired changes, to enable remote JMX connectivity
+# export JMX_OPTS="-Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.port=13001"
+# export OPTS="$OPTS $JMX_OPTS"
+
+# Extra Java runtime options.
+# Below are what we set by default.  May only work with SUN JVM.
+# For more on why as well as other possible settings,
+# see http://wiki.apache.org/hadoop/PerformanceTuning
+export OPTS="$OPTS -XX:+UseConcMarkSweepGC"

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-assembly/src/build/client.xml
----------------------------------------------------------------------
diff --git a/phoenix-assembly/src/build/client.xml b/phoenix-assembly/src/build/client.xml
index 0e1e1f6..c1fe861 100644
--- a/phoenix-assembly/src/build/client.xml
+++ b/phoenix-assembly/src/build/client.xml
@@ -59,6 +59,10 @@
         <include>org.apache.htrace:htrace-core</include>
         <include>io.netty:netty-all</include>
         <include>commons-codec:commons-codec</include>
+        <include>co.cask.tephra:tephra*</include>
+        <include>org.apache.twill:twill*</include>
+        <include>org.apache.thrift:*</include>
+        <include>com.google.code.gson:gson*</include>
       </includes>
     </dependencySet>
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-assembly/src/build/server-with-txns.xml
----------------------------------------------------------------------
diff --git a/phoenix-assembly/src/build/server-with-txns.xml b/phoenix-assembly/src/build/server-with-txns.xml
new file mode 100644
index 0000000..2808a8c
--- /dev/null
+++ b/phoenix-assembly/src/build/server-with-txns.xml
@@ -0,0 +1,48 @@
+<?xml version='1.0'?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+  <!-- build the phoenix server side jar, that includes phoenix-hadoopX-compat, phoenix-hadoop-compat and antlr -->
+  <id>server-with-txns</id>
+  <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <dependencySets>
+    <dependencySet>
+      <outputDirectory>/</outputDirectory>
+      <unpack>true</unpack>
+      <includes>
+        <include>org.apache.phoenix:phoenix-core</include>
+        <include>org.iq80.snappy:snappy</include>
+        <include>org.antlr:antlr*</include>
+        <include>co.cask.tephra:tephra*</include>
+        <include>com.google.code.gson:gson</include>
+	<include>com.google.inject.extensions:guice-assistedinject</include>
+	<include>it.unimi.dsi:fastutil</include>
+	<include>io.dropwizard.metrics:metrics-core</include>
+      </includes>
+    </dependencySet>
+  </dependencySets>
+</assembly>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-assembly/src/build/server-without-antlr.xml
----------------------------------------------------------------------
diff --git a/phoenix-assembly/src/build/server-without-antlr.xml b/phoenix-assembly/src/build/server-without-antlr.xml
new file mode 100644
index 0000000..33cf0b7
--- /dev/null
+++ b/phoenix-assembly/src/build/server-without-antlr.xml
@@ -0,0 +1,46 @@
+<?xml version='1.0'?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
+  <!-- build the phoenix server side jar, that includes phoenix-hadoopX-compat and phoenix-hadoop-compat. -->
+  <id>server-without-antlr</id>
+  <formats>
+    <format>jar</format>
+  </formats>
+  <includeBaseDirectory>false</includeBaseDirectory>
+  <dependencySets>
+    <dependencySet>
+      <outputDirectory>/</outputDirectory>
+      <unpack>true</unpack>
+      <includes>
+        <include>org.apache.phoenix:phoenix-core</include>
+        <include>org.iq80.snappy:snappy</include>
+        <include>co.cask.tephra:tephra*</include>
+        <include>com.google.code.gson:gson</include>
+        <include>org.jruby.joni:joni</include>
+        <include>org.jruby.jcodings:jcodings</include>
+      </includes>
+    </dependencySet>
+  </dependencySets>
+</assembly>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-assembly/src/build/server.xml
----------------------------------------------------------------------
diff --git a/phoenix-assembly/src/build/server.xml b/phoenix-assembly/src/build/server.xml
index 78a4b1f..141f97b 100644
--- a/phoenix-assembly/src/build/server.xml
+++ b/phoenix-assembly/src/build/server.xml
@@ -36,17 +36,13 @@
       <includes>
         <include>org.apache.phoenix:phoenix-core</include>
         <include>org.iq80.snappy:snappy</include>
+	<include>org.antlr:antlr*</include>
+        <include>co.cask.tephra:tephra*</include>
+        <include>com.google.code.gson:gson</include>
         <include>org.jruby.joni:joni</include>
         <include>org.jruby.jcodings:jcodings</include>
 	<include>joda-time:joda-time</include>
       </includes>
     </dependencySet>
-    <dependencySet>
-      <unpack>true</unpack>
-      <outputDirectory>/</outputDirectory>
-      <includes>
-        <include>org.antlr:antlr*</include>
-      </includes>
-    </dependencySet>
   </dependencySets>
 </assembly>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index 2bb7466..5c73ed4 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -224,6 +224,30 @@
   </build>
 
   <dependencies>
+    <!-- Transaction dependencies -->
+    <dependency>
+      <groupId>co.cask.tephra</groupId>
+      <artifactId>tephra-api</artifactId>
+      <version>${tephra.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>co.cask.tephra</groupId>
+      <artifactId>tephra-core</artifactId>
+      <version>${tephra.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>co.cask.tephra</groupId>
+      <artifactId>tephra-core</artifactId>
+      <type>test-jar</type>
+      <version>${tephra.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>co.cask.tephra</groupId>
+      <artifactId>tephra-hbase-compat-1.1</artifactId>
+      <version>${tephra.version}</version>
+    </dependency>
+  
     <!-- Make sure we have all the antlr dependencies -->
     <dependency>
       <groupId>org.antlr</groupId>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
index bee8d21..d4f7c16 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConnectionQueryServicesTestImpl.java
@@ -35,8 +35,8 @@ import org.apache.phoenix.query.QueryServices;
 public class ConnectionQueryServicesTestImpl extends ConnectionQueryServicesImpl {
     protected int NUM_SLAVES_BASE = 1; // number of slaves for the cluster
     
-    public ConnectionQueryServicesTestImpl(QueryServices services, ConnectionInfo info) throws SQLException {
-        super(services, info, null);
+    public ConnectionQueryServicesTestImpl(QueryServices services, ConnectionInfo info, Properties props) throws SQLException {
+        super(services, info, props);
     }
     
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorWithSplitsAndMultiCFIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorWithSplitsAndMultiCFIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorWithSplitsAndMultiCFIT.java
index 88a8215..6191a4f 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorWithSplitsAndMultiCFIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/StatsCollectorWithSplitsAndMultiCFIT.java
@@ -133,7 +133,7 @@ public class StatsCollectorWithSplitsAndMultiCFIT extends StatsCollectorAbstract
         Collection<GuidePostsInfo> infos = TestUtil.getGuidePostsList(conn, STATS_TEST_TABLE_NAME_NEW);
         long[] rowCountArr = new long[]{25, 20, 25, 25};
         // CF A alone has double the bytecount because it has column qualifier A and column qualifier _0
-        long[] byteCountArr = new long[]{12120, 5540, 6652, 6652};
+        long[] byteCountArr = new long[]{12144, 5540, 6652, 6652};
         int i = 0;
         for(GuidePostsInfo info : infos) {
             assertRowCountAndByteCount(info, rowCountArr[i], byteCountArr[i]);
@@ -148,8 +148,8 @@ public class StatsCollectorWithSplitsAndMultiCFIT extends StatsCollectorAbstract
     }
 
     protected void assertRowCountAndByteCount(GuidePostsInfo info, long rowCount, long byteCount) {
-        assertEquals(info.getRowCount(), rowCount);
-        assertEquals(info.getByteCount(), byteCount);
+        assertEquals("Row count does not match ", rowCount, info.getRowCount());
+        assertEquals("Byte count does not match ", byteCount, info.getByteCount());
     }
 
     @Test
@@ -176,7 +176,7 @@ public class StatsCollectorWithSplitsAndMultiCFIT extends StatsCollectorAbstract
         Collection<GuidePostsInfo> infos = TestUtil.getGuidePostsList(conn, STATS_TEST_TABLE_NAME);
         for (GuidePostsInfo info : infos) {
             assertEquals(20, info.getRowCount());
-            assertEquals(11020, info.getByteCount());
+            assertEquals(11040, info.getByteCount());
             break;
         }
         List<KeyRange> keyRanges = getAllSplits(conn, STATS_TEST_TABLE_NAME);
@@ -269,7 +269,7 @@ public class StatsCollectorWithSplitsAndMultiCFIT extends StatsCollectorAbstract
         infos = TestUtil.getGuidePostsList(conn, STATS_TEST_TABLE_NAME);
         for (GuidePostsInfo info : infos) {
             assertEquals(20, info.getRowCount());
-            assertEquals(9918, info.getByteCount());
+            assertEquals(9936, info.getByteCount());
             break;
         }
         conn.close();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
index 240d335..b03037c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ImmutableIndexIT.java
@@ -48,17 +48,28 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT {
 	
 	private final boolean localIndex;
 	private final String tableDDLOptions;
+	private final String tableName;
+    private final String indexName;
+    private final String fullTableName;
+    private final String fullIndexName;
 	
-	public ImmutableIndexIT(boolean localIndex) {
+	public ImmutableIndexIT(boolean localIndex, boolean transactional) {
 		this.localIndex = localIndex;
 		StringBuilder optionBuilder = new StringBuilder("IMMUTABLE_ROWS=true");
+		if (transactional) {
+			optionBuilder.append(", TRANSACTIONAL=true");
+		}
 		this.tableDDLOptions = optionBuilder.toString();
+		this.tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + ( transactional ?  "_TXN" : "");
+        this.indexName = "IDX" + ( transactional ?  "_TXN" : "");
+        this.fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+        this.fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
 	}
 	
-	@Parameters(name="localIndex = {0}")
+	@Parameters(name="localIndex = {0} , transactional = {1}")
     public static Collection<Boolean[]> data() {
         return Arrays.asList(new Boolean[][] {     
-                 { false}, { true }
+                 { false, false }, { false, true }, { true, false }, { true, true }
            });
     }
    
@@ -67,11 +78,6 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT {
     	Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
 	        conn.setAutoCommit(false);
-	        // create unique table and index names for each parameterized test
-	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
-	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
-	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
-	        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
 	        String ddl ="CREATE TABLE " + fullTableName + BaseTest.TEST_TABLE_SCHEMA + tableDDLOptions;
 	        Statement stmt = conn.createStatement();
 	        stmt.execute(ddl);
@@ -102,4 +108,4 @@ public class ImmutableIndexIT extends BaseHBaseManagedTimeIT {
     }
     
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexExpressionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexExpressionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexExpressionIT.java
index e3d1db7..7da0d85 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexExpressionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexExpressionIT.java
@@ -38,6 +38,7 @@ import org.apache.phoenix.util.DateUtil;
 import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
+import org.apache.phoenix.util.TestUtil;
 import org.junit.Test;
 
 public class IndexExpressionIT extends BaseHBaseManagedTimeIT {
@@ -75,7 +76,7 @@ public class IndexExpressionIT extends BaseHBaseManagedTimeIT {
         stmt.setInt(3, i);
         stmt.setLong(4, i);
         stmt.setBigDecimal(5, new BigDecimal(i*0.5d));
-        Date date = new Date(DateUtil.parseDate("2015-01-01 00:00:00").getTime() + (i - 1) * NUM_MILLIS_IN_DAY);
+        Date date = new Date(DateUtil.parseDate("2015-01-01 00:00:00").getTime() + (i - 1) * TestUtil.NUM_MILLIS_IN_DAY);
         stmt.setDate(6, date);
         stmt.setString(7, "a.varchar" + String.valueOf(i));
         stmt.setString(8, "a.char" + String.valueOf(i));
@@ -98,7 +99,7 @@ public class IndexExpressionIT extends BaseHBaseManagedTimeIT {
                 + "_A.VARCHAR" + String.valueOf(i) + "_" + StringUtils.rightPad("B.CHAR" + String.valueOf(i), 10, ' '),
                 rs.getString(1));
         assertEquals(i * 3, rs.getInt(2));
-        Date date = new Date(DateUtil.parseDate("2015-01-01 00:00:00").getTime() + (i) * NUM_MILLIS_IN_DAY);
+        Date date = new Date(DateUtil.parseDate("2015-01-01 00:00:00").getTime() + (i) * TestUtil.NUM_MILLIS_IN_DAY);
         assertEquals(date, rs.getDate(3));
         assertEquals(date, rs.getDate(4));
         assertEquals(date, rs.getDate(5));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java
index 695225f..d2c6267 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/IndexIT.java
@@ -33,7 +33,6 @@ import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.TestUtil;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -44,20 +43,33 @@ public class IndexIT extends BaseHBaseManagedTimeIT {
 	
 	private final boolean localIndex;
 	private final String tableDDLOptions;
+	private final String tableName;
+    private final String indexName;
+    private final String fullTableName;
+    private final String fullIndexName;
 	
-	public IndexIT(boolean localIndex, boolean mutable) {
+	public IndexIT(boolean localIndex, boolean mutable, boolean transactional) {
 		this.localIndex = localIndex;
 		StringBuilder optionBuilder = new StringBuilder();
 		if (!mutable) 
 			optionBuilder.append(" IMMUTABLE_ROWS=true ");
+		if (transactional) {
+			if (!(optionBuilder.length()==0))
+				optionBuilder.append(",");
+			optionBuilder.append(" TRANSACTIONAL=true ");
+		}
 		this.tableDDLOptions = optionBuilder.toString();
+		this.tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + ( transactional ?  "_TXN" : "");
+        this.indexName = "IDX" + ( transactional ?  "_TXN" : "");
+        this.fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+        this.fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
 	}
 	
-	@Parameters(name="localIndex = {0} , mutable = {1}")
+	@Parameters(name="localIndex = {0} , mutable = {1} , transactional = {2}")
     public static Collection<Boolean[]> data() {
         return Arrays.asList(new Boolean[][] {     
-                 { false, false }, { false, true },
-                 { true, false }, { true, true }
+                 { false, false, false }, { false, false, true }, { false, true, false }, { false, true, true }, 
+                 { true, false, false }, { true, false, true }, { true, true, false }, { true, true, true }
            });
     }
 
@@ -66,10 +78,6 @@ public class IndexIT extends BaseHBaseManagedTimeIT {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
 	        conn.setAutoCommit(false);
-	        // create unique table and index names for each parameterized test
-	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
-	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
-	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
 	        String ddl ="CREATE TABLE " + fullTableName + BaseTest.TEST_TABLE_SCHEMA + tableDDLOptions;
 	        Statement stmt = conn.createStatement();
 	        stmt.execute(ddl);
@@ -122,16 +130,10 @@ public class IndexIT extends BaseHBaseManagedTimeIT {
     }
     
     @Test
-    @Ignore("Failing due to zero byte incorrectly being stripped from row key") // FIXME: fixed in master, so remove this ignore tag when merged.
     public void testDeleteFromAllPKColumnIndex() throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
 	        conn.setAutoCommit(false);
-	        // create unique table and index names for each parameterized test
-	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
-	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
-	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
-	        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
 	        String ddl ="CREATE TABLE " + fullTableName + BaseTest.TEST_TABLE_SCHEMA + tableDDLOptions;
 	        Statement stmt = conn.createStatement();
 	        stmt.execute(ddl);
@@ -185,11 +187,6 @@ public class IndexIT extends BaseHBaseManagedTimeIT {
     @Test
     public void testDeleteFromNonPKColumnIndex() throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        // create unique table and index names for each parameterized test
-        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
-        String indexName = "IDX"  + "_" + System.currentTimeMillis();
-        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
-        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
         String ddl ="CREATE TABLE " + fullTableName + BaseTest.TEST_TABLE_SCHEMA + tableDDLOptions;
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
 	        conn.setAutoCommit(false);
@@ -242,10 +239,6 @@ public class IndexIT extends BaseHBaseManagedTimeIT {
     	Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
     	try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
 	        conn.setAutoCommit(false);
-	        // create unique table and index names for each parameterized test
-	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
-	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
-	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
 	        String ddl ="CREATE TABLE " + fullTableName + BaseTest.TEST_TABLE_SCHEMA + tableDDLOptions;
 	        Statement stmt = conn.createStatement();
 	        stmt.execute(ddl);
@@ -264,10 +257,6 @@ public class IndexIT extends BaseHBaseManagedTimeIT {
     	Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
     	try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
 	        conn.setAutoCommit(false);
-	        // create unique table and index names for each parameterized test
-	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
-	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
-	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
 	        String ddl ="CREATE TABLE " + fullTableName + BaseTest.TEST_TABLE_SCHEMA + tableDDLOptions;
 	        Statement stmt = conn.createStatement();
 	        stmt.execute(ddl);
@@ -291,10 +280,6 @@ public class IndexIT extends BaseHBaseManagedTimeIT {
     	Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
     	try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
 	        conn.setAutoCommit(false);
-	        // create unique table and index names for each parameterized test
-	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
-	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
-	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
 	        String ddl ="CREATE TABLE " + fullTableName + BaseTest.TEST_TABLE_SCHEMA + tableDDLOptions;
 	        Statement stmt = conn.createStatement();
 	        stmt.execute(ddl);
@@ -319,11 +304,6 @@ public class IndexIT extends BaseHBaseManagedTimeIT {
 	        conn.setAutoCommit(false);
 	        String query;
             ResultSet rs;
-	        // create unique table and index names for each parameterized test
-	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
-	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
-	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
-	        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
 	        String ddl ="CREATE TABLE " + fullTableName 
 	        		+ " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) DEFAULT_COLUMN_FAMILY='A'" + (!tableDDLOptions.isEmpty() ? "," + tableDDLOptions : "");
 	        Statement stmt = conn.createStatement();
@@ -361,11 +341,6 @@ public class IndexIT extends BaseHBaseManagedTimeIT {
     	Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
 	        conn.setAutoCommit(false);
-	        // create unique table and index names for each parameterized test
-	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
-	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
-	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
-	        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
             Date date = new Date(System.currentTimeMillis());
             
             createMultiCFTestTable(fullTableName, tableDDLOptions);
@@ -423,12 +398,6 @@ public class IndexIT extends BaseHBaseManagedTimeIT {
 	        conn.setAutoCommit(false);
 	        String query;
 	        ResultSet rs;
-	        // create unique table and index names for each parameterized test
-	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
-	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
-	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
-	        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
-	        
 	        String ddl = "CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) " + tableDDLOptions;
 			conn.createStatement().execute(ddl);
 	        query = "SELECT * FROM " + fullTableName;
@@ -504,12 +473,6 @@ public class IndexIT extends BaseHBaseManagedTimeIT {
 	        conn.setAutoCommit(false);
 	        String query;
 	        ResultSet rs;
-	        // create unique table and index names for each parameterized test
-	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
-	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
-	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
-	        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
-	        
 	        String ddl = "CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL PRIMARY KEY, a.v1 VARCHAR, a.v2 VARCHAR, b.v1 VARCHAR) " + tableDDLOptions;
 			conn.createStatement().execute(ddl);
 	        query = "SELECT * FROM " + fullTableName;
@@ -567,12 +530,6 @@ public class IndexIT extends BaseHBaseManagedTimeIT {
 	        conn.setAutoCommit(false);
 	        String query;
 	        ResultSet rs;
-	        // create unique table and index names for each parameterized test
-	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
-	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
-	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
-	        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
-    
 	        // make sure that the tables are empty, but reachable
 	        conn.createStatement().execute(
 	          "CREATE TABLE " + fullTableName
@@ -629,34 +586,29 @@ public class IndexIT extends BaseHBaseManagedTimeIT {
     @Test
     public void testMultipleUpdatesAcrossRegions() throws Exception {
     	Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+    	String testTable = fullTableName+"_MULTIPLE_UPDATES";
         try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
 	        conn.setAutoCommit(false);
 	        String query;
 	        ResultSet rs;
-	        // create unique table and index names for each parameterized test
-	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
-	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
-	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
-	        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
-    
 	        // make sure that the tables are empty, but reachable
 	        conn.createStatement().execute(
-	          "CREATE TABLE " + fullTableName
+	          "CREATE TABLE " + testTable
 	              + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) "  + HTableDescriptor.MAX_FILESIZE + "=1, " + HTableDescriptor.MEMSTORE_FLUSHSIZE + "=1 " 
 	        		  + (!tableDDLOptions.isEmpty() ? "," + tableDDLOptions : "") + "SPLIT ON ('b')");
-	        query = "SELECT * FROM " + fullTableName;
+	        query = "SELECT * FROM " + testTable;
 	        rs = conn.createStatement().executeQuery(query);
 	        assertFalse(rs.next());
 	
 	        conn.createStatement().execute(
-	  	          "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON " + fullTableName + " (v1, v2)");
+	  	          "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON " + testTable + " (v1, v2)");
 	        query = "SELECT * FROM " + fullIndexName;
 	        rs = conn.createStatement().executeQuery(query);
 	        assertFalse(rs.next());
 	    
 	        // load some data into the table
 	        PreparedStatement stmt =
-	            conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
+	            conn.prepareStatement("UPSERT INTO " + testTable + " VALUES(?,?,?)");
 	        stmt.setString(1, "a");
 	        stmt.setString(2, "x");
 	        stmt.setString(3, "1");
@@ -688,10 +640,10 @@ public class IndexIT extends BaseHBaseManagedTimeIT {
 	        assertEquals("c", rs.getString(3));
 	        assertFalse(rs.next());
 	
-	        query = "SELECT * FROM " + fullTableName;
+	        query = "SELECT * FROM " + testTable;
 	        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
 	        if (localIndex) {
-	            assertEquals("CLIENT PARALLEL 2-WAY RANGE SCAN OVER _LOCAL_IDX_" + fullTableName+" [-32768]\n"
+	            assertEquals("CLIENT PARALLEL 2-WAY RANGE SCAN OVER _LOCAL_IDX_" + testTable+" [-32768]\n"
 	                       + "    SERVER FILTER BY FIRST KEY ONLY\n"
 	                       + "CLIENT MERGE SORT",
 	                QueryUtil.getExplainPlan(rs));
@@ -726,12 +678,6 @@ public class IndexIT extends BaseHBaseManagedTimeIT {
 	        conn.setAutoCommit(false);
 	        String query;
 	        ResultSet rs;
-	        // create unique table and index names for each parameterized test
-	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
-	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
-	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
-	        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
-	        
             conn.createStatement().execute("CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL PRIMARY KEY, \"V1\" VARCHAR, \"v2\" VARCHAR)"+tableDDLOptions);
             query = "SELECT * FROM "+fullTableName;
             rs = conn.createStatement().executeQuery(query);
@@ -815,11 +761,6 @@ public class IndexIT extends BaseHBaseManagedTimeIT {
 	        conn.setAutoCommit(false);
 	        String query;
 	        ResultSet rs;
-	        // create unique table and index names for each parameterized test
-	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
-	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
-	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
-	        
 	        String ddl = "CREATE TABLE " + fullTableName +"  (PK1 CHAR(2) NOT NULL PRIMARY KEY, CF1.COL1 BIGINT) " + tableDDLOptions;
 	        conn.createStatement().execute(ddl);
 	        ddl = "CREATE " + (localIndex ? "LOCAL " : "") + "INDEX " + indexName + " ON " + fullTableName + "(COL1)";
@@ -838,11 +779,6 @@ public class IndexIT extends BaseHBaseManagedTimeIT {
 	        conn.setAutoCommit(false);
 	        String query;
 	        ResultSet rs;
-	        // create unique table and index names for each parameterized test
-	        String tableName = TestUtil.DEFAULT_DATA_TABLE_NAME + "_" + System.currentTimeMillis();
-	        String indexName = "IDX"  + "_" + System.currentTimeMillis();
-	        String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
-	        String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
             Date date = new Date(System.currentTimeMillis());
             
             createMultiCFTestTable(fullTableName, tableDDLOptions);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/cc9929b5/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
index 364b358..39c90be 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexFailureIT.java
@@ -26,11 +26,13 @@ import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
@@ -69,11 +71,15 @@ import org.apache.phoenix.util.QueryUtil;
 import org.apache.phoenix.util.ReadOnlyProps;
 import org.apache.phoenix.util.SchemaUtil;
 import org.apache.phoenix.util.StringUtil;
+import org.apache.phoenix.util.TestUtil;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 /**
  * 
  * Test for failure of region server to write to index table.
@@ -85,18 +91,30 @@ import org.junit.experimental.categories.Category;
  */
 
 @Category(NeedsOwnMiniClusterTest.class)
+@RunWith(Parameterized.class)
 public class MutableIndexFailureIT extends BaseTest {
     private static final int NUM_SLAVES = 4;
-    private static String url;
     private static PhoenixTestDriver driver;
     private static HBaseTestingUtility util;
     private Timer scheduleTimer;
 
-    private static final String SCHEMA_NAME = "S";
-    private static final String INDEX_TABLE_NAME = "I";
-    private static final String DATA_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, "T");
-    private static final String INDEX_TABLE_FULL_NAME = SchemaUtil.getTableName(SCHEMA_NAME, "I");
+    private String tableName;
+    private String indexName;
+    private String fullTableName;
+    private String fullIndexName;
 
+    private boolean transactional;
+    private final String tableDDLOptions;
+
+    public MutableIndexFailureIT(boolean transactional) {
+        this.transactional = transactional;
+        this.tableDDLOptions = transactional ? " TRANSACTIONAL=true " : "";
+        this.tableName = TestUtil.DEFAULT_DATA_TABLE_NAME;
+        this.indexName = "IDX";
+        this.fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
+        this.fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
+    }
+    
     @Before
     public void doSetup() throws Exception {
         Configuration conf = HBaseConfiguration.create();
@@ -114,8 +132,15 @@ public class MutableIndexFailureIT extends BaseTest {
         url = JDBC_PROTOCOL + JDBC_PROTOCOL_SEPARATOR + LOCALHOST + JDBC_PROTOCOL_SEPARATOR + clientPort
                 + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM;
         driver = initAndRegisterDriver(url, ReadOnlyProps.EMPTY_PROPS);
+        clusterInitialized = true;
+        setupTxManager();
     }
-
+    
+    @Parameters(name = "transactional = {0}")
+    public static Collection<Boolean[]> data() {
+        return Arrays.asList(new Boolean[][] { { false }, { true } });
+    }
+    
     @After
     public void tearDown() throws Exception {
         try {
@@ -135,230 +160,279 @@ public class MutableIndexFailureIT extends BaseTest {
     @Ignore("See PHOENIX-2331")
     @Test(timeout=300000)
     public void testWriteFailureDisablesLocalIndex() throws Exception {
-        testWriteFailureDisablesIndex(true);
+        helpTestWriteFailureDisablesIndex(true);
     }
  
     @Ignore("See PHOENIX-2332")
     @Test(timeout=300000)
     public void testWriteFailureDisablesIndex() throws Exception {
-        testWriteFailureDisablesIndex(false);
+        helpTestWriteFailureDisablesIndex(false);
     }
     
-    public void testWriteFailureDisablesIndex(boolean localIndex) throws Exception {
-        String query;
-        ResultSet rs;
-
+    public void helpTestWriteFailureDisablesIndex(boolean localIndex) throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = driver.connect(url, props);
-        conn.setAutoCommit(false);
-        conn.createStatement().execute(
-                "CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
-        query = "SELECT * FROM " + DATA_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertFalse(rs.next());
-
-        if(localIndex) {
+        try (Connection conn = driver.connect(url, props)) {
+            String query;
+            ResultSet rs;
+            conn.setAutoCommit(false);
             conn.createStatement().execute(
-                "CREATE LOCAL INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)");
-            conn.createStatement().execute(
-                "CREATE LOCAL INDEX " + INDEX_TABLE_NAME+ "_2" + " ON " + DATA_TABLE_FULL_NAME + " (v2) INCLUDE (v1)");
-        } else {
-            conn.createStatement().execute(
-                "CREATE INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)");
-        }
+                    "CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) "+tableDDLOptions);
+            query = "SELECT * FROM " + fullTableName;
+            rs = conn.createStatement().executeQuery(query);
+            assertFalse(rs.next());
+    
+            if(localIndex) {
+                conn.createStatement().execute(
+                    "CREATE LOCAL INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
+                conn.createStatement().execute(
+                    "CREATE LOCAL INDEX " + indexName+ "_2" + " ON " + fullTableName + " (v2) INCLUDE (v1)");
+            } else {
+                conn.createStatement().execute(
+                    "CREATE INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
+            }
+                
+            query = "SELECT * FROM " + fullIndexName;
+            rs = conn.createStatement().executeQuery(query);
+            assertFalse(rs.next());
+    
+            // Verify the metadata for index is correct.
+            rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName,
+                    new String[] { PTableType.INDEX.toString() });
+            assertTrue(rs.next());
+            assertEquals(indexName, rs.getString(3));
+            assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
+            assertFalse(rs.next());
             
-        query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertFalse(rs.next());
-
-        // Verify the metadata for index is correct.
-        rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
-                new String[] { PTableType.INDEX.toString() });
-        assertTrue(rs.next());
-        assertEquals(INDEX_TABLE_NAME, rs.getString(3));
-        assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
-        assertFalse(rs.next());
-        
-        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
-        stmt.setString(1, "a");
-        stmt.setString(2, "x");
-        stmt.setString(3, "1");
-        stmt.execute();
-        conn.commit();
-
-        TableName indexTable =
-                TableName.valueOf(localIndex ? MetaDataUtil
-                        .getLocalIndexTableName(DATA_TABLE_FULL_NAME) : INDEX_TABLE_FULL_NAME);
-        HBaseAdmin admin = this.util.getHBaseAdmin();
-        HTableDescriptor indexTableDesc = admin.getTableDescriptor(indexTable);
-        try{
-          admin.disableTable(indexTable);
-          admin.deleteTable(indexTable);
-        } catch (TableNotFoundException ignore) {}
-
-        stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
-        stmt.setString(1, "a2");
-        stmt.setString(2, "x2");
-        stmt.setString(3, "2");
-        stmt.execute();
-        try {
+            PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
+            stmt.setString(1, "a");
+            stmt.setString(2, "x");
+            stmt.setString(3, "1");
+            stmt.execute();
             conn.commit();
-        } catch (SQLException e) {}
-
-        // Verify the metadata for index is correct.
-        rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
-                new String[] { PTableType.INDEX.toString() });
-        assertTrue(rs.next());
-        assertEquals(INDEX_TABLE_NAME, rs.getString(3));
-        assertEquals(PIndexState.DISABLE.toString(), rs.getString("INDEX_STATE"));
-        assertFalse(rs.next());
-        if(localIndex) {
-            rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME+"_2",
-                new String[] { PTableType.INDEX.toString() });
+    
+            TableName indexTable =
+                    TableName.valueOf(localIndex ? MetaDataUtil
+                            .getLocalIndexTableName(fullTableName) : fullIndexName);
+            HBaseAdmin admin = this.util.getHBaseAdmin();
+            HTableDescriptor indexTableDesc = admin.getTableDescriptor(indexTable);
+            try{
+              admin.disableTable(indexTable);
+              admin.deleteTable(indexTable);
+            } catch (TableNotFoundException ignore) {}
+    
+            stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
+            stmt.setString(1, "a2");
+            stmt.setString(2, "x2");
+            stmt.setString(3, "2");
+            stmt.execute();
+            if (transactional) {
+                try {
+                    conn.commit();
+                    fail();
+                } catch (SQLException e) {
+                    conn.rollback();
+                }
+            }
+            else {
+                conn.commit();
+            }
+    
+            // Verify the metadata for index is correct.
+            rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName,
+                    new String[] { PTableType.INDEX.toString() });
             assertTrue(rs.next());
-            assertEquals(INDEX_TABLE_NAME+"_2", rs.getString(3));
-            assertEquals(PIndexState.DISABLE.toString(), rs.getString("INDEX_STATE"));
+            assertEquals(indexName, rs.getString(3));
+            // the index is only disabled for non-txn tables upon index table write failure
+            PIndexState indexState =  transactional ? PIndexState.ACTIVE : PIndexState.DISABLE;
+            assertEquals(indexState.toString(), rs.getString("INDEX_STATE"));
             assertFalse(rs.next());
-        }
-
-        // Verify UPSERT on data table still work after index is disabled       
-        stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
-        stmt.setString(1, "a3");
-        stmt.setString(2, "x3");
-        stmt.setString(3, "3");
-        stmt.execute();
-        conn.commit();
-        
-        query = "SELECT v2 FROM " + DATA_TABLE_FULL_NAME + " where v1='x3'";
-        rs = conn.createStatement().executeQuery("EXPLAIN " + query);
-        assertTrue(QueryUtil.getExplainPlan(rs).contains("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + DATA_TABLE_FULL_NAME));
-        rs = conn.createStatement().executeQuery(query);
-        assertTrue(rs.next());
-        
-        // recreate index table
-        admin.createTable(indexTableDesc);
-        do {
-          Thread.sleep(15 * 1000); // sleep 15 secs
-          rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
-              new String[] { PTableType.INDEX.toString() });
-          assertTrue(rs.next());
-          if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
-              break;
-          }
-          if(localIndex) {
-              rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME+"_2",
+            if(localIndex) {
+                rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName + "_2",
+                    new String[] { PTableType.INDEX.toString() });
+                assertTrue(rs.next());
+                assertEquals(indexName + "_2", rs.getString(3));
+                assertEquals(indexState.toString(), rs.getString("INDEX_STATE"));
+                assertFalse(rs.next());
+            }
+    
+            // if the table is transactional the write to the index table will fail because the
+            // index has not been disabled
+            if (!transactional) {
+                // Verify UPSERT on data table still work after index is disabled
+                stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
+                stmt.setString(1, "a3");
+                stmt.setString(2, "x3");
+                stmt.setString(3, "3");
+                stmt.execute();
+                conn.commit();
+            }
+            
+            if (transactional) {
+                // if the table was transactional there should be 1 row (written before the index
+                // was disabled)
+                query = "SELECT /*+ NO_INDEX */ v2 FROM " + fullTableName;
+                rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+                String expectedPlan =
+                        "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullTableName;
+                assertEquals(expectedPlan, QueryUtil.getExplainPlan(rs));
+                rs = conn.createStatement().executeQuery(query);
+                assertTrue(rs.next());
+                assertEquals("1", rs.getString(1));
+                assertFalse(rs.next());
+            } else {
+                // if the table was not transactional there should be three rows (all writes to data
+                // table should succeed)
+                query = "SELECT v2 FROM " + fullTableName;
+                rs = conn.createStatement().executeQuery("EXPLAIN " + query);
+                String expectedPlan =
+                        "CLIENT PARALLEL 1-WAY FULL SCAN OVER " + fullTableName;
+                assertEquals(expectedPlan, QueryUtil.getExplainPlan(rs));
+                rs = conn.createStatement().executeQuery(query);
+                assertTrue(rs.next());
+                assertEquals("1", rs.getString(1));
+                assertTrue(rs.next());
+                assertEquals("2", rs.getString(1));
+                assertTrue(rs.next());
+                assertEquals("3", rs.getString(1));
+                assertFalse(rs.next());
+            }
+            
+            // recreate index table
+            admin.createTable(indexTableDesc);
+            do {
+              Thread.sleep(15 * 1000); // sleep 15 secs
+              rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName,
                   new String[] { PTableType.INDEX.toString() });
               assertTrue(rs.next());
               if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
                   break;
               }
-          }
-        } while(true);
-        
-        // verify index table has data
-        query = "SELECT count(1) FROM " + INDEX_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertTrue(rs.next());
-        
-        // using 2 here because we only partially build index from where we failed and the oldest 
-        // index row has been deleted when we dropped the index table during test.
-        assertEquals(2, rs.getInt(1));
-    }
-    
-    @Test(timeout=300000)
-    public void testWriteFailureWithRegionServerDown() throws Exception {
-        String query;
-        ResultSet rs;
-
-        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
-        Connection conn = driver.connect(url, props);
-        conn.setAutoCommit(false);
-        conn.createStatement().execute(
-                "CREATE TABLE " + DATA_TABLE_FULL_NAME + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR)");
-        query = "SELECT * FROM " + DATA_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertFalse(rs.next());
+              if(localIndex) {
+                  rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName + "_2",
+                      new String[] { PTableType.INDEX.toString() });
+                  assertTrue(rs.next());
+                  if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
+                      break;
+                  }
+              }
+            } while(true);
+            
+            // Verify UPSERT on data table still work after index table is recreated
+            stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
+            stmt.setString(1, "a4");
+            stmt.setString(2, "x4");
+            stmt.setString(3, "4");
+            stmt.execute();
+            conn.commit();
 
-        conn.createStatement().execute(
-                "CREATE INDEX " + INDEX_TABLE_NAME + " ON " + DATA_TABLE_FULL_NAME + " (v1) INCLUDE (v2)");
-        query = "SELECT * FROM " + INDEX_TABLE_FULL_NAME;
-        rs = conn.createStatement().executeQuery(query);
-        assertFalse(rs.next());
+            // verify index table has data
+            query = "SELECT count(1) FROM " + fullIndexName;
+            rs = conn.createStatement().executeQuery(query);
+            assertTrue(rs.next());
 
-        // Verify the metadata for index is correct.
-        rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
-                new String[] { PTableType.INDEX.toString() });
-        assertTrue(rs.next());
-        assertEquals(INDEX_TABLE_NAME, rs.getString(3));
-        assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
-        assertFalse(rs.next());
-        
-        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
-        stmt.setString(1, "a");
-        stmt.setString(2, "x");
-        stmt.setString(3, "1");
-        stmt.execute();
-        conn.commit();
-        
-        // find a RS which doesn't has CATALOG table
-        TableName catalogTable = TableName.valueOf("SYSTEM.CATALOG");
-        TableName indexTable = TableName.valueOf(INDEX_TABLE_FULL_NAME);
-        final HBaseCluster cluster = this.util.getHBaseCluster();
-        Collection<ServerName> rss = cluster.getClusterStatus().getServers();
-        HBaseAdmin admin = this.util.getHBaseAdmin();
-        List<HRegionInfo> regions = admin.getTableRegions(catalogTable);
-        ServerName catalogRS = cluster.getServerHoldingRegion(regions.get(0).getTable(),
-                regions.get(0).getRegionName());
-        ServerName metaRS = cluster.getServerHoldingMeta();
-        ServerName rsToBeKilled = null;
-        
-        // find first RS isn't holding META or CATALOG table
-        for(ServerName curRS : rss) {
-            if(!curRS.equals(catalogRS) && !metaRS.equals(curRS)) {
-                rsToBeKilled = curRS;
-                break;
+            // for txn tables there will be only one row in the index (a4)
+            // for non txn tables there will be three rows because we only partially build index
+            // from where we failed and the oldest
+            // index row has been deleted when we dropped the index table during test
+            assertEquals(transactional ? 1 : 3, rs.getInt(1));
             }
         }
-        assertTrue(rsToBeKilled != null);
-        
-        regions = admin.getTableRegions(indexTable);
-        final HRegionInfo indexRegion = regions.get(0);
-        final ServerName dstRS = rsToBeKilled;
-        admin.move(indexRegion.getEncodedNameAsBytes(), Bytes.toBytes(rsToBeKilled.getServerName()));
-        this.util.waitFor(30000, 200, new Waiter.Predicate<Exception>() {
-            @Override
-            public boolean evaluate() throws Exception {
-              ServerName sn = cluster.getServerHoldingRegion(indexRegion.getTable(),
-                      indexRegion.getRegionName());
-              return (sn != null && sn.equals(dstRS));
-            }
-          });
-        
-        // use timer sending updates in every 10ms
-        this.scheduleTimer = new Timer(true);
-        this.scheduleTimer.schedule(new SendingUpdatesScheduleTask(conn), 0, 10);
-        // let timer sending some updates
-        Thread.sleep(100);
         
-        // kill RS hosting index table
-        this.util.getHBaseCluster().killRegionServer(rsToBeKilled);
-        
-        // wait for index table completes recovery
-        this.util.waitUntilAllRegionsAssigned(indexTable);
+        @Test(timeout=300000)
+        public void testWriteFailureWithRegionServerDown() throws Exception {
+            String query;
+            ResultSet rs;
+    
+            Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+            try (Connection conn = driver.connect(url, props);) {
+                conn.setAutoCommit(false);
+                conn.createStatement().execute(
+                        "CREATE TABLE " + fullTableName + " (k VARCHAR NOT NULL PRIMARY KEY, v1 VARCHAR, v2 VARCHAR) "+tableDDLOptions);
+                query = "SELECT * FROM " + fullTableName;
+                rs = conn.createStatement().executeQuery(query);
+                assertFalse(rs.next());
         
-        // Verify the metadata for index is correct.       
-        do {
-          Thread.sleep(15 * 1000); // sleep 15 secs
-          rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(SCHEMA_NAME), INDEX_TABLE_NAME,
-              new String[] { PTableType.INDEX.toString() });
-          assertTrue(rs.next());
-          if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
-              break;
-          }
-        } while(true);
-        this.scheduleTimer.cancel();
+                conn.createStatement().execute(
+                        "CREATE INDEX " + indexName + " ON " + fullTableName + " (v1) INCLUDE (v2)");
+                query = "SELECT * FROM " + fullIndexName;
+                rs = conn.createStatement().executeQuery(query);
+                assertFalse(rs.next());
         
-        assertEquals(cluster.getClusterStatus().getDeadServers(), 1);
+                // Verify the metadata for index is correct.
+                rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName,
+                        new String[] { PTableType.INDEX.toString() });
+                assertTrue(rs.next());
+                assertEquals(indexName, rs.getString(3));
+                assertEquals(PIndexState.ACTIVE.toString(), rs.getString("INDEX_STATE"));
+                assertFalse(rs.next());
+                
+                PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
+                stmt.setString(1, "a");
+                stmt.setString(2, "x");
+                stmt.setString(3, "1");
+                stmt.execute();
+                conn.commit();
+                
+                // find a RS which doesn't has CATALOG table
+                TableName catalogTable = TableName.valueOf("SYSTEM.CATALOG");
+                TableName indexTable = TableName.valueOf(fullIndexName);
+                final HBaseCluster cluster = this.util.getHBaseCluster();
+                Collection<ServerName> rss = cluster.getClusterStatus().getServers();
+                HBaseAdmin admin = this.util.getHBaseAdmin();
+                List<HRegionInfo> regions = admin.getTableRegions(catalogTable);
+                ServerName catalogRS = cluster.getServerHoldingRegion(regions.get(0).getTable(),
+                        regions.get(0).getRegionName());
+                ServerName metaRS = cluster.getServerHoldingMeta();
+                ServerName rsToBeKilled = null;
+                
+                // find first RS isn't holding META or CATALOG table
+                for(ServerName curRS : rss) {
+                    if(!curRS.equals(catalogRS) && !metaRS.equals(curRS)) {
+                        rsToBeKilled = curRS;
+                        break;
+                    }
+                }
+                assertTrue(rsToBeKilled != null);
+                
+                regions = admin.getTableRegions(indexTable);
+                final HRegionInfo indexRegion = regions.get(0);
+                final ServerName dstRS = rsToBeKilled;
+                admin.move(indexRegion.getEncodedNameAsBytes(), Bytes.toBytes(rsToBeKilled.getServerName()));
+                this.util.waitFor(30000, 200, new Waiter.Predicate<Exception>() {
+                    @Override
+                    public boolean evaluate() throws Exception {
+                      ServerName sn = cluster.getServerHoldingRegion(indexRegion.getTable(),
+                              indexRegion.getRegionName());
+                      return (sn != null && sn.equals(dstRS));
+                    }
+                  });
+                
+                // use timer sending updates in every 10ms
+                this.scheduleTimer = new Timer(true);
+                this.scheduleTimer.schedule(new SendingUpdatesScheduleTask(conn, fullTableName), 0, 10);
+                // let timer sending some updates
+                Thread.sleep(100);
+                
+                // kill RS hosting index table
+                this.util.getHBaseCluster().killRegionServer(rsToBeKilled);
+                
+                // wait for index table completes recovery
+                this.util.waitUntilAllRegionsAssigned(indexTable);
+                
+                // Verify the metadata for index is correct.       
+                do {
+                  Thread.sleep(15 * 1000); // sleep 15 secs
+                  rs = conn.getMetaData().getTables(null, StringUtil.escapeLike(TestUtil.DEFAULT_SCHEMA_NAME), indexName,
+                      new String[] { PTableType.INDEX.toString() });
+                  assertTrue(rs.next());
+                  if(PIndexState.ACTIVE.toString().equals(rs.getString("INDEX_STATE"))){
+                      break;
+                  }
+                } while(true);
+                this.scheduleTimer.cancel();
+                
+                assertEquals(cluster.getClusterStatus().getDeadServers(), 1);
+            }
     }
     
     static class SendingUpdatesScheduleTask extends TimerTask {
@@ -368,10 +442,12 @@ public class MutableIndexFailureIT extends BaseTest {
         // running
         private final static AtomicInteger inProgress = new AtomicInteger(0);
         private final Connection conn;
+        private final String fullTableName;
         private int inserts = 0;
 
-        public SendingUpdatesScheduleTask(Connection conn) {
+        public SendingUpdatesScheduleTask(Connection conn, String fullTableName) {
             this.conn = conn;
+            this.fullTableName = fullTableName;
         }
 
         public void run() {
@@ -382,7 +458,7 @@ public class MutableIndexFailureIT extends BaseTest {
             try {
                 inProgress.incrementAndGet();
                 inserts++;
-                PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + DATA_TABLE_FULL_NAME + " VALUES(?,?,?)");
+                PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + fullTableName + " VALUES(?,?,?)");
                 stmt.setString(1, "a" + inserts);
                 stmt.setString(2, "x" + inserts);
                 stmt.setString(3, String.valueOf(inserts));