You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/10/06 03:35:23 UTC
[5/5] git commit: PHOENIX-1257 Upserted data seen by SELECT in UPSERT
SELECT execution (Lars Hofhansl)
PHOENIX-1257 Upserted data seen by SELECT in UPSERT SELECT execution (Lars Hofhansl)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e49e8dcf
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e49e8dcf
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e49e8dcf
Branch: refs/heads/4.0
Commit: e49e8dcfbed740e13515c0b9aaf79db602059fd4
Parents: c9101f8
Author: James Taylor <jt...@salesforce.com>
Authored: Sun Oct 5 13:26:52 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Sun Oct 5 18:11:37 2014 -0700
----------------------------------------------------------------------
.../phoenix/end2end/CoalesceFunctionIT.java | 67 ++++++++------------
.../apache/phoenix/end2end/ReverseScanIT.java | 2 +-
...ipRangeParallelIteratorRegionSplitterIT.java | 3 +-
.../end2end/TenantSpecificTablesDDLIT.java | 2 +-
.../phoenix/end2end/ToCharFunctionIT.java | 4 +-
.../phoenix/end2end/ToNumberFunctionIT.java | 4 +-
.../end2end/UpsertSelectAutoCommitIT.java | 23 +++++++
.../salted/SaltedTableVarLengthRowKeyIT.java | 8 +--
.../apache/phoenix/compile/FromCompiler.java | 32 +++++++---
.../apache/phoenix/compile/UpsertCompiler.java | 19 ++++++
.../apache/phoenix/execute/BaseQueryPlan.java | 6 --
11 files changed, 104 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e49e8dcf/phoenix-core/src/it/java/org/apache/phoenix/end2end/CoalesceFunctionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CoalesceFunctionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CoalesceFunctionIT.java
index 57599e6..45fcb48 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CoalesceFunctionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CoalesceFunctionIT.java
@@ -67,7 +67,7 @@ public class CoalesceFunctionIT extends BaseHBaseManagedTimeIT {
public void coalesceWithSumExplicitLong() throws Exception {
Connection conn = DriverManager.getConnection(getUrl());
- String ddl = "CREATE TABLE IF NOT EXISTS TEST_COALESCE("
+ String ddl = "CREATE TABLE TEST_COALESCE("
+ " ID BIGINT NOT NULL, "
+ " COUNT BIGINT "
+ " CONSTRAINT pk PRIMARY KEY(ID))";
@@ -91,7 +91,7 @@ public class CoalesceFunctionIT extends BaseHBaseManagedTimeIT {
public void coalesceWithSumImplicitLong() throws Exception {
Connection conn = DriverManager.getConnection(getUrl());
- String ddl = "CREATE TABLE IF NOT EXISTS TEST_COALESCE("
+ String ddl = "CREATE TABLE TEST_COALESCE("
+ " ID BIGINT NOT NULL, "
+ " COUNT BIGINT "
+ " CONSTRAINT pk PRIMARY KEY(ID))";
@@ -115,7 +115,7 @@ public class CoalesceFunctionIT extends BaseHBaseManagedTimeIT {
public void coalesceWithSecondParamAsExpression() throws Exception {
Connection conn = DriverManager.getConnection(getUrl());
- String ddl = "CREATE TABLE IF NOT EXISTS TEST_COALESCE("
+ String ddl = "CREATE TABLE TEST_COALESCE("
+ " ID BIGINT NOT NULL, "
+ " COUNT BIGINT "
+ " CONSTRAINT pk PRIMARY KEY(ID))";
@@ -139,7 +139,7 @@ public class CoalesceFunctionIT extends BaseHBaseManagedTimeIT {
public void nonTypedSecondParameterLong() throws Exception {
Connection conn = DriverManager.getConnection(getUrl());
- String ddl = "CREATE TABLE IF NOT EXISTS TEST_COALESCE("
+ String ddl = "CREATE TABLE TEST_COALESCE("
+ " ID BIGINT NOT NULL, "
+ " COUNT BIGINT " //first parameter to coalesce
+ " CONSTRAINT pk PRIMARY KEY(ID))";
@@ -163,47 +163,32 @@ public class CoalesceFunctionIT extends BaseHBaseManagedTimeIT {
public void nonTypedSecondParameterUnsignedDataTypes() throws Exception {
Connection conn = DriverManager.getConnection(getUrl());
- String[] dataTypes = {
- "UNSIGNED_INT",
- "UNSIGNED_LONG",
- "UNSIGNED_TINYINT",
- "UNSIGNED_SMALLINT",
- "UNSIGNED_FLOAT",
- "UNSIGNED_DOUBLE",
- "UNSIGNED_TIME",
- "UNSIGNED_DATE",
- "UNSIGNED_TIMESTAMP"
- };
-
- for (String dataType : dataTypes) {
-
- String ddl = "CREATE TABLE IF NOT EXISTS TEST_COALESCE("
- + " ID BIGINT NOT NULL, "
- + " COUNT " + dataType //first parameter to coalesce
- + " CONSTRAINT pk PRIMARY KEY(ID))";
- conn.createStatement().execute(ddl);
-
- conn.createStatement().execute("UPSERT INTO TEST_COALESCE(ID, COUNT) VALUES(2, null)");
- conn.commit();
-
- //second param to coalesce is signed int
- ResultSet rs = conn.createStatement().executeQuery(
- "SELECT "
- + "COALESCE(NTH_VALUE(COUNT, 100) WITHIN GROUP (ORDER BY COUNT DESC), 1) "
- + "FROM TEST_COALESCE "
- + "GROUP BY ID");
+ String ddl = "CREATE TABLE TEST_COALESCE ("
+ + " ID BIGINT NOT NULL, "
+ + " COUNT UNSIGNED_INT " //first parameter to coalesce
+ + " CONSTRAINT pk PRIMARY KEY(ID))";
+ conn.createStatement().execute(ddl);
- assertTrue(rs.next());
- assertEquals(1, rs.getInt(1));
- assertFalse(rs.wasNull());
- }
+ conn.createStatement().execute("UPSERT INTO TEST_COALESCE (ID, COUNT) VALUES(2, null)");
+ conn.commit();
+
+ //second param to coalesce is signed int
+ ResultSet rs = conn.createStatement().executeQuery(
+ "SELECT "
+ + " COALESCE(NTH_VALUE(COUNT, 100) WITHIN GROUP (ORDER BY COUNT DESC), 1) "
+ + " FROM TEST_COALESCE"
+ + " GROUP BY ID");
+
+ assertTrue(rs.next());
+ assertEquals(1, rs.getInt(1));
+ assertFalse(rs.wasNull());
}
@Test
public void testWithNthValueAggregationFunction() throws Exception {
Connection conn = DriverManager.getConnection(getUrl());
- String ddl = "CREATE TABLE IF NOT EXISTS TEST_NTH("
+ String ddl = "CREATE TABLE TEST_NTH("
+ " ID BIGINT NOT NULL, "
+ " DATE TIMESTAMP NOT NULL, "
+ " COUNT BIGINT "
@@ -234,7 +219,7 @@ public class CoalesceFunctionIT extends BaseHBaseManagedTimeIT {
public void wrongDataTypeOfSecondParameter() throws Exception {
Connection conn = DriverManager.getConnection(getUrl());
- String ddl = "CREATE TABLE IF NOT EXISTS TEST_COALESCE("
+ String ddl = "CREATE TABLE TEST_COALESCE("
+ " ID UNSIGNED_INT NOT NULL, "
+ " COUNT UNSIGNED_INT "
+ " CONSTRAINT pk PRIMARY KEY(ID))";
@@ -260,7 +245,7 @@ public class CoalesceFunctionIT extends BaseHBaseManagedTimeIT {
public void testImplicitSecondArgCastingException() throws Exception {
Connection conn = DriverManager.getConnection(getUrl());
- String ddl = "CREATE TABLE IF NOT EXISTS TEST_COALESCE("
+ String ddl = "CREATE TABLE TEST_COALESCE("
+ " ID INTEGER NOT NULL, "
+ " COUNT UNSIGNED_INT " //first parameter to coalesce
+ " CONSTRAINT pk PRIMARY KEY(ID))";
@@ -288,7 +273,7 @@ public class CoalesceFunctionIT extends BaseHBaseManagedTimeIT {
public void testImplicitSecondArgCasting() throws Exception {
Connection conn = DriverManager.getConnection(getUrl());
- String ddl = "CREATE TABLE IF NOT EXISTS TEST_COALESCE("
+ String ddl = "CREATE TABLE TEST_COALESCE("
+ " ID DOUBLE NOT NULL, "
+ " COUNT INTEGER " //first parameter to coalesce
+ " CONSTRAINT pk PRIMARY KEY(ID))";
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e49e8dcf/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java
index f738773..26d6d4b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ReverseScanIT.java
@@ -47,7 +47,7 @@ import org.junit.experimental.categories.Category;
import com.google.common.collect.Maps;
-@Category(ClientManagedTimeTest.class)
+@Category(HBaseManagedTimeTest.class)
public class ReverseScanIT extends BaseHBaseManagedTimeIT {
@BeforeClass
@Shadower(classBeingShadowed = BaseClientManagedTimeIT.class)
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e49e8dcf/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java
index 18d7910..a760f74 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SkipRangeParallelIteratorRegionSplitterIT.java
@@ -357,7 +357,8 @@ public class SkipRangeParallelIteratorRegionSplitterIT extends BaseClientManaged
}
};
- PhoenixConnection connection = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES)).unwrap(PhoenixConnection.class);
+ String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + nextTimestamp();
+ PhoenixConnection connection = DriverManager.getConnection(url, PropertiesUtil.deepCopy(TEST_PROPERTIES)).unwrap(PhoenixConnection.class);
final PhoenixStatement statement = new PhoenixStatement(connection);
final StatementContext context = new StatementContext(statement, resolver, scan, new SequenceManager(statement));
context.setScanRanges(scanRanges);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e49e8dcf/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
index 2d6c30b..0e2b75c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/TenantSpecificTablesDDLIT.java
@@ -506,7 +506,7 @@ public class TenantSpecificTablesDDLIT extends BaseTenantSpecificTablesIT {
conn.close();
// Global connection sees all tenant tables
- conn = DriverManager.getConnection(getUrl());
+ conn = DriverManager.getConnection(getUrl(), props);
rs = conn.getMetaData().getSuperTables(TENANT_ID, null, null);
assertTrue(rs.next());
assertEquals(TENANT_ID, rs.getString(PhoenixDatabaseMetaData.TABLE_CAT));
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e49e8dcf/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToCharFunctionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToCharFunctionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToCharFunctionIT.java
index 10c34ee..1fa88f5 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToCharFunctionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToCharFunctionIT.java
@@ -218,7 +218,9 @@ public class ToCharFunctionIT extends BaseClientManagedTimeIT {
}
private void runOneRowQueryTest(String oneRowQuery, Integer pkValue, String projectedValue) throws Exception {
- Connection conn = DriverManager.getConnection(getUrl());
+ long ts = nextTimestamp();
+ String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts;
+ Connection conn = DriverManager.getConnection(url);
try {
PreparedStatement statement = conn.prepareStatement(oneRowQuery);
ResultSet rs = statement.executeQuery();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e49e8dcf/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToNumberFunctionIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToNumberFunctionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToNumberFunctionIT.java
index 9f415cf..431cbda 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToNumberFunctionIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ToNumberFunctionIT.java
@@ -284,7 +284,9 @@ public class ToNumberFunctionIT extends BaseClientManagedTimeIT {
}
private void runOneRowQueryTest(String oneRowQuery, boolean isIntegerColumn, Integer expectedIntValue, BigDecimal expectedDecimalValue) throws Exception {
- Connection conn = DriverManager.getConnection(getUrl());
+ long ts = nextTimestamp();
+ String url = getUrl() + ";" + PhoenixRuntime.CURRENT_SCN_ATTRIB + "=" + ts;
+ Connection conn = DriverManager.getConnection(url);
try {
PreparedStatement statement = conn.prepareStatement(oneRowQuery);
ResultSet rs = statement.executeQuery();
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e49e8dcf/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java
index 86a52d0..e39619c 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectAutoCommitIT.java
@@ -29,8 +29,10 @@ import java.sql.Date;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
+import java.sql.Statement;
import java.util.Properties;
+import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.PropertiesUtil;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -135,4 +137,25 @@ public class UpsertSelectAutoCommitIT extends BaseHBaseManagedTimeIT {
conn.commit();
}
+
+ @Test
+ public void testUpsertSelectDoesntSeeUpsertedData() throws Exception {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ props.setProperty(QueryServices.MUTATE_BATCH_SIZE_ATTRIB, Integer.toString(3));
+ props.setProperty(QueryServices.SCAN_CACHE_SIZE_ATTRIB, Integer.toString(3));
+ props.setProperty(QueryServices.SCAN_RESULT_CHUNK_SIZE, Integer.toString(3));
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ conn.setAutoCommit(true);
+ conn.createStatement().execute("CREATE SEQUENCE keys");
+ conn.createStatement().execute("CREATE TABLE foo (pk INTEGER PRIMARY KEY, val INTEGER)");
+
+ conn.createStatement().execute("UPSERT INTO foo VALUES (NEXT VALUE FOR keys,1)");
+ for (int i=0; i<6; i++) {
+ Statement stmt = conn.createStatement();
+ int upsertCount = stmt.executeUpdate("UPSERT INTO foo SELECT NEXT VALUE FOR keys, val FROM foo");
+ assertEquals((int)Math.pow(2, i), upsertCount);
+ }
+ conn.close();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e49e8dcf/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableVarLengthRowKeyIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableVarLengthRowKeyIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableVarLengthRowKeyIT.java
index ae696eb..db517a6 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableVarLengthRowKeyIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/salted/SaltedTableVarLengthRowKeyIT.java
@@ -29,14 +29,14 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.Properties;
-import org.apache.phoenix.end2end.BaseClientManagedTimeIT;
-import org.apache.phoenix.end2end.ClientManagedTimeTest;
+import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
+import org.apache.phoenix.end2end.HBaseManagedTimeTest;
import org.apache.phoenix.util.PropertiesUtil;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-@Category(ClientManagedTimeTest.class)
-public class SaltedTableVarLengthRowKeyIT extends BaseClientManagedTimeIT {
+@Category(HBaseManagedTimeTest.class)
+public class SaltedTableVarLengthRowKeyIT extends BaseHBaseManagedTimeIT {
private static void initTableValues() throws Exception {
Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e49e8dcf/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
index 5ee29e2..6f7b006 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/FromCompiler.java
@@ -156,9 +156,9 @@ public class FromCompiler {
throws SQLException {
List<TableNode> fromNodes = statement.getFrom();
if (!statement.isJoin() && fromNodes.get(0) instanceof NamedTableNode)
- return new SingleTableColumnResolver(connection, (NamedTableNode) fromNodes.get(0), true);
+ return new SingleTableColumnResolver(connection, (NamedTableNode) fromNodes.get(0), true, 1);
- MultiTableColumnResolver visitor = new MultiTableColumnResolver(connection);
+ MultiTableColumnResolver visitor = new MultiTableColumnResolver(connection, 1);
for (TableNode node : fromNodes) {
node.accept(visitor);
}
@@ -187,11 +187,11 @@ public class FromCompiler {
}
private static class SingleTableColumnResolver extends BaseColumnResolver {
- private final List<TableRef> tableRefs;
- private final String alias;
+ private final List<TableRef> tableRefs;
+ private final String alias;
public SingleTableColumnResolver(PhoenixConnection connection, NamedTableNode table, long timeStamp) throws SQLException {
- super(connection);
+ super(connection, 0);
List<PColumnFamily> families = Lists.newArrayListWithExpectedSize(table.getDynamicColumns().size());
for (ColumnDef def : table.getDynamicColumns()) {
if (def.getColumnDefName().getFamilyName() != null) {
@@ -206,13 +206,17 @@ public class FromCompiler {
}
public SingleTableColumnResolver(PhoenixConnection connection, NamedTableNode tableNode, boolean updateCacheImmediately) throws SQLException {
- super(connection);
+ this(connection, tableNode, updateCacheImmediately, 0);
+ }
+
+ public SingleTableColumnResolver(PhoenixConnection connection, NamedTableNode tableNode, boolean updateCacheImmediately, int tsAddition) throws SQLException {
+ super(connection, tsAddition);
alias = tableNode.getAlias();
TableRef tableRef = createTableRef(tableNode, updateCacheImmediately);
tableRefs = ImmutableList.of(tableRef);
}
- @Override
+ @Override
public List<TableRef> getTables() {
return tableRefs;
}
@@ -273,10 +277,15 @@ public class FromCompiler {
private static abstract class BaseColumnResolver implements ColumnResolver {
protected final PhoenixConnection connection;
protected final MetaDataClient client;
+ // Fudge factor to add to current time we calculate. We need this when we do a SELECT
+ // on Windows because the millis timestamp granularity is so bad we sometimes won't
+ // get the data back that we just upsert.
+ private final int tsAddition;
- private BaseColumnResolver(PhoenixConnection connection) {
+ private BaseColumnResolver(PhoenixConnection connection, int tsAddition) {
this.connection = connection;
this.client = new MetaDataClient(connection);
+ this.tsAddition = tsAddition;
}
protected TableRef createTableRef(NamedTableNode tableNode, boolean updateCacheImmediately) throws SQLException {
@@ -319,6 +328,9 @@ public class FromCompiler {
// Add any dynamic columns to the table declaration
List<ColumnDef> dynamicColumns = tableNode.getDynamicColumns();
theTable = addDynamicColumns(dynamicColumns, theTable);
+ if (timeStamp != QueryConstants.UNSET_TIMESTAMP) {
+ timeStamp += tsAddition;
+ }
TableRef tableRef = new TableRef(tableNode.getAlias(), theTable, timeStamp, !dynamicColumns.isEmpty());
if (logger.isDebugEnabled() && timeStamp != QueryConstants.UNSET_TIMESTAMP) {
logger.debug(LogUtil.addCustomAnnotations("Re-resolved stale table " + fullTableName + " with seqNum " + tableRef.getTable().getSequenceNumber() + " at timestamp " + tableRef.getTable().getTimeStamp() + " with " + tableRef.getTable().getColumns().size() + " columns: " + tableRef.getTable().getColumns(), connection));
@@ -359,8 +371,8 @@ public class FromCompiler {
private final ListMultimap<String, TableRef> tableMap;
private final List<TableRef> tables;
- private MultiTableColumnResolver(PhoenixConnection connection) {
- super(connection);
+ private MultiTableColumnResolver(PhoenixConnection connection, int tsAddition) {
+ super(connection, tsAddition);
tableMap = ArrayListMultimap.<String, TableRef> create();
tables = Lists.newArrayList();
}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e49e8dcf/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 3a48a93..f363bdc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -374,6 +374,7 @@ public class UpsertCompiler {
select = prependTenantAndViewConstants(table, select, tenantId, addViewColumnsToBe);
sameTable = select.getFrom().size() == 1
&& tableRefToBe.equals(selectResolver.getTables().get(0));
+ tableRefToBe = adjustTimestampToMinOfSameTable(tableRefToBe, selectResolver.getTables());
/* We can run the upsert in a coprocessor if:
* 1) from has only 1 table and the into table matches from table
* 2) the select query isn't doing aggregation (which requires a client-side final merge)
@@ -797,6 +798,24 @@ public class UpsertCompiler {
};
}
+ private TableRef adjustTimestampToMinOfSameTable(TableRef upsertRef, List<TableRef> selectRefs) {
+ long minTimestamp = Long.MAX_VALUE;
+ for (TableRef selectRef : selectRefs) {
+ if (selectRef.equals(upsertRef)) {
+ minTimestamp = Math.min(minTimestamp, selectRef.getTimeStamp());
+ }
+ }
+ if (minTimestamp != Long.MAX_VALUE) {
+ // If we found the same table is selected from that is being upserted to,
+ // reset the timestamp of the upsert (which controls the Put timestamp)
+ // to the lowest timestamp we found to ensure that the data being selected
+ // will not see the data being upserted. This prevents infinite loops
+ // like the one in PHOENIX-1257.
+ return new TableRef(upsertRef, minTimestamp);
+ }
+ return upsertRef;
+ }
+
private static final class UpsertValuesCompiler extends ExpressionCompiler {
private PColumn column;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/e49e8dcf/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
index d35ee8d..9a3e399 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/BaseQueryPlan.java
@@ -28,7 +28,6 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
@@ -177,11 +176,6 @@ public abstract class BaseQueryPlan implements QueryPlan {
Long scn = connection.getSCN();
if (scn == null) {
scn = context.getCurrentTime();
- // Add one to server time since max of time range is exclusive
- // and we need to account of OSs with lower resolution clocks.
- if (scn < HConstants.LATEST_TIMESTAMP) {
- scn++;
- }
}
ScanUtil.setTimeRange(scan, scn);
} else {