You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/10/06 17:57:00 UTC

[6/7] git commit: PHOENIX-1257 Upserted data seen by SELECT in UPSERT SELECT execution (Lars Hofhansl)

PHOENIX-1257 Upserted data seen by SELECT in UPSERT SELECT execution (Lars Hofhansl)

Conflicts:
	phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java
	phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/002a9de6
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/002a9de6
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/002a9de6

Branch: refs/heads/3.0
Commit: 002a9de6a8dc6089ecc8bc27372e2424eccb855f
Parents: f68f52a
Author: James Taylor <jt...@salesforce.com>
Authored: Sun Oct 5 13:26:52 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Mon Oct 6 01:42:59 2014 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/CoalesceFunctionIT.java     | 67 ++++++++------------
 ...ipRangeParallelIteratorRegionSplitterIT.java |  3 +-
 .../end2end/TenantSpecificTablesDDLIT.java      |  2 +-
 .../phoenix/end2end/ToCharFunctionIT.java       |  4 +-
 .../phoenix/end2end/ToNumberFunctionIT.java     |  4 +-
 .../end2end/UpsertSelectAutoCommitIT.java       | 23 +++++++
 .../salted/SaltedTableVarLengthRowKeyIT.java    |  8 +--
 .../apache/phoenix/compile/FromCompiler.java    | 32 +++++++---
 .../apache/phoenix/compile/UpsertCompiler.java  | 19 ++++++
 .../apache/phoenix/execute/BaseQueryPlan.java   |  6 --
 10 files changed, 103 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/002a9de6/phoenix-core/src/it/java/org/apache/phoenix/end2end/CoalesceFunctionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CoalesceFunctionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CoalesceFunctionIT.java
index 57599e6..45fcb48 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CoalesceFunctionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CoalesceFunctionIT.java
@@ -67,7 +67,7 @@ public class CoalesceFunctionIT extends BaseHBaseManagedTimeIT {
     public void coalesceWithSumExplicitLong() throws Exception {
         Connection conn = DriverManager.getConnection(getUrl());
 
-        String ddl = "CREATE TABLE IF NOT EXISTS TEST_COALESCE("
+        String ddl = "CREATE TABLE TEST_COALESCE("
                 + "    ID BIGINT NOT NULL, "
                 + "    COUNT BIGINT "
                 + "    CONSTRAINT pk PRIMARY KEY(ID))";
@@ -91,7 +91,7 @@ public class CoalesceFunctionIT extends BaseHBaseManagedTimeIT {
     public void coalesceWithSumImplicitLong() throws Exception {
         Connection conn = DriverManager.getConnection(getUrl());
 
-        String ddl = "CREATE TABLE IF NOT EXISTS TEST_COALESCE("
+        String ddl = "CREATE TABLE TEST_COALESCE("
                 + "    ID BIGINT NOT NULL, "
                 + "    COUNT BIGINT "
                 + "    CONSTRAINT pk PRIMARY KEY(ID))";
@@ -115,7 +115,7 @@ public class CoalesceFunctionIT extends BaseHBaseManagedTimeIT {
     public void coalesceWithSecondParamAsExpression() throws Exception {
         Connection conn = DriverManager.getConnection(getUrl());
 
-        String ddl = "CREATE TABLE IF NOT EXISTS TEST_COALESCE("
+        String ddl = "CREATE TABLE TEST_COALESCE("
                 + "    ID BIGINT NOT NULL, "
                 + "    COUNT BIGINT "
                 + "    CONSTRAINT pk PRIMARY KEY(ID))";
@@ -139,7 +139,7 @@ public class CoalesceFunctionIT extends BaseHBaseManagedTimeIT {
     public void nonTypedSecondParameterLong() throws Exception {
         Connection conn = DriverManager.getConnection(getUrl());
 
-        String ddl = "CREATE TABLE IF NOT EXISTS TEST_COALESCE("
+        String ddl = "CREATE TABLE TEST_COALESCE("
                 + "    ID BIGINT NOT NULL, "
                 + "    COUNT BIGINT " //first parameter to coalesce
                 + "    CONSTRAINT pk PRIMARY KEY(ID))";
@@ -163,47 +163,32 @@ public class CoalesceFunctionIT extends BaseHBaseManagedTimeIT {
     public void nonTypedSecondParameterUnsignedDataTypes() throws Exception {
         Connection conn = DriverManager.getConnection(getUrl());
 
-        String[] dataTypes = {
-            "UNSIGNED_INT",
-            "UNSIGNED_LONG",
-            "UNSIGNED_TINYINT",
-            "UNSIGNED_SMALLINT",
-            "UNSIGNED_FLOAT",
-            "UNSIGNED_DOUBLE",
-            "UNSIGNED_TIME",
-            "UNSIGNED_DATE",
-            "UNSIGNED_TIMESTAMP"
-        };
-
-        for (String dataType : dataTypes) {
-
-            String ddl = "CREATE TABLE IF NOT EXISTS TEST_COALESCE("
-                    + "    ID BIGINT NOT NULL, "
-                    + "    COUNT " + dataType //first parameter to coalesce
-                    + "    CONSTRAINT pk PRIMARY KEY(ID))";
-            conn.createStatement().execute(ddl);
-
-            conn.createStatement().execute("UPSERT INTO TEST_COALESCE(ID, COUNT) VALUES(2, null)");
-            conn.commit();
-
-            //second param to coalesce is signed int
-            ResultSet rs = conn.createStatement().executeQuery(
-                    "SELECT "
-                    + "COALESCE(NTH_VALUE(COUNT, 100) WITHIN GROUP (ORDER BY COUNT DESC), 1) "
-                    + "FROM TEST_COALESCE "
-                    + "GROUP BY ID");
+        String ddl = "CREATE TABLE TEST_COALESCE ("
+                + "    ID BIGINT NOT NULL, "
+                + "    COUNT UNSIGNED_INT " //first parameter to coalesce
+                + "    CONSTRAINT pk PRIMARY KEY(ID))";
+        conn.createStatement().execute(ddl);
 
-            assertTrue(rs.next());
-            assertEquals(1, rs.getInt(1));
-            assertFalse(rs.wasNull());
-        }
+        conn.createStatement().execute("UPSERT INTO TEST_COALESCE (ID, COUNT) VALUES(2, null)");
+        conn.commit();
+
+        //second param to coalesce is signed int
+        ResultSet rs = conn.createStatement().executeQuery(
+                "SELECT "
+                + " COALESCE(NTH_VALUE(COUNT, 100) WITHIN GROUP (ORDER BY COUNT DESC), 1) "
+                + " FROM TEST_COALESCE" 
+                + " GROUP BY ID");
+
+        assertTrue(rs.next());
+        assertEquals(1, rs.getInt(1));
+        assertFalse(rs.wasNull());
     }
 
     @Test
     public void testWithNthValueAggregationFunction() throws Exception {
         Connection conn = DriverManager.getConnection(getUrl());
 
-        String ddl = "CREATE TABLE IF NOT EXISTS TEST_NTH("
+        String ddl = "CREATE TABLE TEST_NTH("
                 + "    ID BIGINT NOT NULL, "
                 + "    DATE TIMESTAMP NOT NULL, "
                 + "    COUNT BIGINT "
@@ -234,7 +219,7 @@ public class CoalesceFunctionIT extends BaseHBaseManagedTimeIT {
     public void wrongDataTypeOfSecondParameter() throws Exception {
         Connection conn = DriverManager.getConnection(getUrl());
 
-        String ddl = "CREATE TABLE IF NOT EXISTS TEST_COALESCE("
+        String ddl = "CREATE TABLE TEST_COALESCE("
                 + "    ID UNSIGNED_INT NOT NULL, "
                 + "    COUNT UNSIGNED_INT "
                 + "    CONSTRAINT pk PRIMARY KEY(ID))";
@@ -260,7 +245,7 @@ public class CoalesceFunctionIT extends BaseHBaseManagedTimeIT {
     public void testImplicitSecondArgCastingException() throws Exception {
         Connection conn = DriverManager.getConnection(getUrl());
 
-        String ddl = "CREATE TABLE IF NOT EXISTS TEST_COALESCE("
+        String ddl = "CREATE TABLE TEST_COALESCE("
                 + "    ID INTEGER NOT NULL, "
                 + "    COUNT UNSIGNED_INT " //first parameter to coalesce
                 + "    CONSTRAINT pk PRIMARY KEY(ID))";
@@ -288,7 +273,7 @@ public class CoalesceFunctionIT extends BaseHBaseManagedTimeIT {
     public void testImplicitSecondArgCasting() throws Exception {
         Connection conn = DriverManager.getConnection(getUrl());
 
-        String ddl = "CREATE TABLE IF NOT EXISTS TEST_COALESCE("
+        String ddl = "CREATE TABLE TEST_COALESCE("
                 + "    ID DOUBLE NOT NULL, "
                 + "    COUNT INTEGER " //first parameter to coalesce
                 + "    CONSTRAINT pk PRIMARY KEY(ID))";

http://git-wip-us.apache.org/repos/asf/phoenix/blob/002a9de6/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java
index 18d7910..a760f74 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java
@@ -357,7 +357,8 @@ public class SkipRangeParallelIteratorRegionSplitterIT extends BaseClientManaged
             }
             
         };
-        PhoenixConnection connection = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES)).unwrap(PhoenixConnection.class);
+        String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + nextTimestamp();
+        PhoenixConnection connection = DriverManager.getConnection(url, PropertiesUtil.deepCopy(TEST_PROPERTIES)).unwrap(PhoenixConnection.class);
         final PhoenixStatement statement = new PhoenixStatement(connection);
         final StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement));
         context.setScanRanges(scanRanges);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/002a9de6/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
index 2d6c30b..0e2b75c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
@@ -506,7 +506,7 @@ public class TenantSpecificTablesDDLIT extends BaseTenantSpecificTablesIT {
             conn.close();
             
             // Global connection sees all tenant tables
-            conn = DriverManager.getConnection(getUrl());
+            conn = DriverManager.getConnection(getUrl(), props);
             rs = conn.getMetaData().getSuperTables(TENANT_ID, null, null);
             assertTrue(rs.next());
             assertEquals(TENANT_ID, rs.getString(PhoenixDatabaseMetaData.TABLE_CAT));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/002a9de6/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToCharFunctionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToCharFunctionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToCharFunctionIT.java
index 10c34ee..1fa88f5 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToCharFunctionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToCharFunctionIT.java
@@ -218,7 +218,9 @@ public class ToCharFunctionIT extends BaseClientManagedTimeIT {
     }
     
     private void runOneRowQueryTest(String oneRowQuery, Integer pkValue, String projectedValue) throws Exception {
-        Connection conn = DriverManager.getConnection(getUrl());
+        long ts = nextTimestamp();
+        String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts;
+        Connection conn = DriverManager.getConnection(url);
         try {
             PreparedStatement statement = conn.prepareStatement(oneRowQuery);
             ResultSet rs = statement.executeQuery();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/002a9de6/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToNumberFunctionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToNumberFunctionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToNumberFunctionIT.java
index 9f415cf..431cbda 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToNumberFunctionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToNumberFunctionIT.java
@@ -284,7 +284,9 @@ public class ToNumberFunctionIT extends BaseClientManagedTimeIT {
     }
     
     private void runOneRowQueryTest(String oneRowQuery, boolean isIntegerColumn, Integer expectedIntValue, BigDecimal expectedDecimalValue) throws Exception {
-        Connection conn = DriverManager.getConnection(getUrl());
+        long ts = nextTimestamp();
+        String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts;
+        Connection conn = DriverManager.getConnection(url);
         try {
             PreparedStatement statement = conn.prepareStatement(oneRowQuery);
             ResultSet rs = statement.executeQuery();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/002a9de6/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java
index 86a52d0..e39619c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java
@@ -29,8 +29,10 @@ import java.sql.Date;
 import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.Statement;
 import java.util.Properties;
 
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -135,4 +137,25 @@ public class UpsertSelectAutoCommitIT extends BaseHBaseManagedTimeIT {
         conn.commit();
     }
     
+    
+    @Test
+    public void testUpsertSelectDoesntSeeUpsertedData() throws Exception {
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        props.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, Integer.toString(3));
+        props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(3));
+        props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, Integer.toString(3));
+        Connection conn = DriverManager.getConnection(getUrl(), props);
+        conn.setAutoCommit(true);
+        conn.createStatement().execute("CREATE SEQUENCE keys");
+        conn.createStatement().execute("CREATE TABLE foo (pk INTEGER PRIMARY KEY, val INTEGER)");
+
+        conn.createStatement().execute("UPSERT INTO foo VALUES (NEXT VALUE FOR keys,1)");
+        for (int i=0; i<6; i++) {
+            Statement stmt = conn.createStatement();
+            int upsertCount = stmt.executeUpdate("UPSERT INTO foo SELECT NEXT VALUE FOR keys, val FROM foo");
+            assertEquals((int)Math.pow(2, i), upsertCount);
+        }
+        conn.close();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/002a9de6/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableVarLengthRowKeyIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableVarLengthRowKeyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableVarLengthRowKeyIT.java
index ae696eb..db517a6 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableVarLengthRowKeyIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableVarLengthRowKeyIT.java
@@ -29,14 +29,14 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.util.Properties;
 
-import org.apache.phoenix.end2end.BaseClientManagedTimeIT;
-import org.apache.phoenix.end2end.ClientManagedTimeTest;
+import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.phoenix.end2end.HBaseManagedTimeTest;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-@Category(ClientManagedTimeTest.class)
-public class SaltedTableVarLengthRowKeyIT extends BaseClientManagedTimeIT {
+@Category(HBaseManagedTimeTest.class)
+public class SaltedTableVarLengthRowKeyIT extends BaseHBaseManagedTimeIT {
 
     private static void initTableValues() throws Exception {
         Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/002a9de6/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
index d5cd240..213fd75 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
@@ -155,9 +155,9 @@ public class FromCompiler {
     		throws SQLException {
     	List<TableNode> fromNodes = statement.getFrom();
         if (!statement.isJoin() && fromNodes.get(0) instanceof NamedTableNode)
-            return new SingleTableColumnResolver(connection, (NamedTableNode) fromNodes.get(0), true);
+            return new SingleTableColumnResolver(connection, (NamedTableNode) fromNodes.get(0), true, 1);
         
-        MultiTableColumnResolver visitor = new MultiTableColumnResolver(connection);
+        MultiTableColumnResolver visitor = new MultiTableColumnResolver(connection, 1);
         for (TableNode node : fromNodes) {
             node.accept(visitor);
         }
@@ -186,11 +186,11 @@ public class FromCompiler {
     }
     
     private static class SingleTableColumnResolver extends BaseColumnResolver {
-        	private final List<TableRef> tableRefs;
-        	private final String alias;
+    	private final List<TableRef> tableRefs;
+    	private final String alias;
     	
        public SingleTableColumnResolver(PhoenixConnection connection, NamedTableNode table, long timeStamp) throws SQLException  {
-           super(connection);
+           super(connection, 0);
            List<PColumnFamily> families = Lists.newArrayListWithExpectedSize(table.getDynamicColumns().size());
            for (ColumnDef def : table.getDynamicColumns()) {
                if (def.getColumnDefName().getFamilyName() != null) {
@@ -205,13 +205,17 @@ public class FromCompiler {
        }
        
         public SingleTableColumnResolver(PhoenixConnection connection, NamedTableNode tableNode, boolean updateCacheImmediately) throws SQLException {
-            super(connection);
+            this(connection, tableNode, updateCacheImmediately, 0);
+        }
+
+        public SingleTableColumnResolver(PhoenixConnection connection, NamedTableNode tableNode, boolean updateCacheImmediately, int tsAddition) throws SQLException {
+            super(connection, tsAddition);
             alias = tableNode.getAlias();
             TableRef tableRef = createTableRef(tableNode, updateCacheImmediately);
             tableRefs = ImmutableList.of(tableRef);
         }
 
-		@Override
+        @Override
 		public List<TableRef> getTables() {
 			return tableRefs;
 		}
@@ -272,10 +276,15 @@ public class FromCompiler {
     private static abstract class BaseColumnResolver implements ColumnResolver {
         protected final PhoenixConnection connection;
         protected final MetaDataClient client;
+        // Fudge factor to add to current time we calculate. We need this when we do a SELECT
+        // on Windows because the millis timestamp granularity is so bad we sometimes won't
+        // get the data back that we just upsert.
+        private final int tsAddition;
         
-        private BaseColumnResolver(PhoenixConnection connection) {
+        private BaseColumnResolver(PhoenixConnection connection, int tsAddition) {
         	this.connection = connection;
             this.client = new MetaDataClient(connection);
+            this.tsAddition = tsAddition;
         }
 
         protected TableRef createTableRef(NamedTableNode tableNode, boolean updateCacheImmediately) throws SQLException {
@@ -318,6 +327,9 @@ public class FromCompiler {
             // Add any dynamic columns to the table declaration
             List<ColumnDef> dynamicColumns = tableNode.getDynamicColumns();
             theTable = addDynamicColumns(dynamicColumns, theTable);
+            if (timeStamp != QueryConstants.UNSET_TIMESTAMP) {
+                timeStamp += tsAddition;
+            }
             TableRef tableRef = new TableRef(tableNode.getAlias(), theTable, timeStamp, !dynamicColumns.isEmpty());
             if (logger.isDebugEnabled() && timeStamp != QueryConstants.UNSET_TIMESTAMP) {
                 logger.debug("Re-resolved stale table " + fullTableName + " with seqNum " + tableRef.getTable().getSequenceNumber() + " at timestamp " + tableRef.getTable().getTimeStamp() + " with " + tableRef.getTable().getColumns().size() + " columns: " + tableRef.getTable().getColumns());
@@ -358,8 +370,8 @@ public class FromCompiler {
         private final ListMultimap<String, TableRef> tableMap;
         private final List<TableRef> tables;
 
-        private MultiTableColumnResolver(PhoenixConnection connection) {
-        	super(connection);
+        private MultiTableColumnResolver(PhoenixConnection connection, int tsAddition) {
+        	super(connection, tsAddition);
             tableMap = ArrayListMultimap.<String, TableRef> create();
             tables = Lists.newArrayList();
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/002a9de6/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index add8e61..9f674f1 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -374,6 +374,7 @@ public class UpsertCompiler {
                     select = prependTenantAndViewConstants(table, select, tenantId, addViewColumnsToBe);
                     sameTable = select.getFrom().size() == 1
                         && tableRefToBe.equals(selectResolver.getTables().get(0));
+                    tableRefToBe = adjustTimestampToMinOfSameTable(tableRefToBe, selectResolver.getTables());
                     /* We can run the upsert in a coprocessor if:
                      * 1) from has only 1 table and the into table matches from table
                      * 2) the select query isn't doing aggregation (which requires a client-side final merge)
@@ -797,6 +798,24 @@ public class UpsertCompiler {
         };
     }
     
+    private TableRef adjustTimestampToMinOfSameTable(TableRef upsertRef, List<TableRef> selectRefs) {
+        long minTimestamp = Long.MAX_VALUE;
+        for (TableRef selectRef : selectRefs) {
+            if (selectRef.equals(upsertRef)) {
+                minTimestamp = Math.min(minTimestamp, selectRef.getTimeStamp());
+            }
+        }
+        if (minTimestamp != Long.MAX_VALUE) {
+            // If we found the same table is selected from that is being upserted to,
+            // reset the timestamp of the upsert (which controls the Put timestamp)
+            // to the lowest timestamp we found to ensure that the data being selected
+            // will not see the data being upserted. This prevents infinite loops
+            // like the one in PHOENIX-1257.
+            return new TableRef(upsertRef, minTimestamp);
+        }
+        return upsertRef;
+    }
+
     private static final class UpsertValuesCompiler extends ExpressionCompiler {
         private PColumn column;
         

http://git-wip-us.apache.org/repos/asf/phoenix/blob/002a9de6/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index eb43aa4..3a59828 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -23,7 +23,6 @@ import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
@@ -144,11 +143,6 @@ public abstract class BaseQueryPlan implements QueryPlan {
           Long scn = connection.getSCN();
           if (scn == null) {
             scn = context.getCurrentTime();
-            // Add one to server time since max of time range is exclusive
-            // and we need to account of OSs with lower resolution clocks.
-            if (scn < HConstants.LATEST_TIMESTAMP) {
-              scn++;
-            }
           }
           ScanUtil.setTimeRange(scan, scn);
         } else {