You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2015/06/05 01:46:20 UTC
phoenix git commit: Updating pom to use 0.5.1-SNAPSHOT and removing
hack to prevent reading your own writes
Repository: phoenix
Updated Branches:
refs/heads/txn 6f8f39436 -> 716307f41
Updating pom to use 0.5.1-SNAPSHOT and removing hack to prevent reading your own writes
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/716307f4
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/716307f4
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/716307f4
Branch: refs/heads/txn
Commit: 716307f414ac96fff925ed4e682428152a1dc32b
Parents: 6f8f394
Author: James Taylor <ja...@apache.org>
Authored: Thu Jun 4 16:46:04 2015 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Thu Jun 4 16:46:04 2015 -0700
----------------------------------------------------------------------
phoenix-core/pom.xml | 8 ++---
.../end2end/index/BaseMutableIndexIT.java | 16 ++++-----
.../apache/phoenix/execute/MutationState.java | 9 ++++-
.../index/PhoenixTransactionalIndexer.java | 38 +++++---------------
4 files changed, 29 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/716307f4/phoenix-core/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index 3fcf339..7ca5d1e 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -224,24 +224,24 @@
<dependency>
<groupId>co.cask.tephra</groupId>
<artifactId>tephra-api</artifactId>
- <version>0.4.2-SNAPSHOT</version>
+ <version>0.5.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>co.cask.tephra</groupId>
<artifactId>tephra-core</artifactId>
- <version>0.4.2-SNAPSHOT</version>
+ <version>0.5.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>co.cask.tephra</groupId>
<artifactId>tephra-core</artifactId>
<type>test-jar</type>
- <version>0.4.2-SNAPSHOT</version>
+ <version>0.5.1-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>co.cask.tephra</groupId>
<artifactId>tephra-hbase-compat-0.98</artifactId>
- <version>0.4.2-SNAPSHOT</version>
+ <version>0.5.1-SNAPSHOT</version>
</dependency>
<!-- Make sure we have all the antlr dependencies -->
http://git-wip-us.apache.org/repos/asf/phoenix/blob/716307f4/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseMutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseMutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseMutableIndexIT.java
index 2676548..000787b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseMutableIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseMutableIndexIT.java
@@ -1131,19 +1131,19 @@ public abstract class BaseMutableIndexIT extends BaseHBaseManagedTimeIT {
stmt.executeUpdate("upsert into DEMO values('cc1', 1.23, 'abc')");
conn.commit();
- //assert values in index table
- rs = stmt.executeQuery("select * from DEMO_IDX");
+ //assert values in data table
+ rs = stmt.executeQuery("select /*+ NO_INDEX */ v1, v2, v3 from DEMO");
assertTrue(rs.next());
- assertEquals(0, Doubles.compare(1.23, rs.getDouble(1)));
- assertEquals("cc1", rs.getString(2));
+ assertEquals("cc1", rs.getString(1));
+ assertEquals(0, Doubles.compare(1.23, rs.getDouble(2)));
assertEquals("abc", rs.getString(3));
assertFalse(rs.next());
- //assert values in data table
- rs = stmt.executeQuery("select v1, v2, v3 from DEMO");
+ //assert values in index table
+ rs = stmt.executeQuery("select * from DEMO_IDX");
assertTrue(rs.next());
- assertEquals("cc1", rs.getString(1));
- assertEquals(0, Doubles.compare(1.23, rs.getDouble(2)));
+ assertEquals(0, Doubles.compare(1.23, rs.getDouble(1)));
+ assertEquals("cc1", rs.getString(2));
assertEquals("abc", rs.getString(3));
assertFalse(rs.next());
http://git-wip-us.apache.org/repos/asf/phoenix/blob/716307f4/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 256e487..562d055 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -510,10 +510,12 @@ public class MutationState implements SQLCloseable {
private void send(Iterator<TableRef> tableRefIterator) throws SQLException {
int i = 0;
long[] serverTimeStamps = null;
+ boolean sendAll = false;
// Validate up front if not transactional so that we
if (tableRefIterator == null) {
serverTimeStamps = validateAll();
tableRefIterator = mutations.keySet().iterator();
+ sendAll = true;
}
// add tracing for this operation
@@ -635,7 +637,12 @@ public class MutationState implements SQLCloseable {
if (tableRef.getTable().getType() != PTableType.INDEX) {
numRows -= valuesMap.size();
}
- tableRefIterator.remove(); // Remove batches as we process them
+ // Remove batches as we process them
+ if (sendAll) {
+ tableRefIterator.remove(); // Iterating through actual map in this case
+ } else {
+ mutations.remove(tableRef);
+ }
}
trace.close();
assert(numRows==0);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/716307f4/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 39f885b..3d5da43 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -20,6 +20,7 @@ import java.util.Map;
import java.util.Set;
import co.cask.tephra.Transaction;
+import co.cask.tephra.Transaction.VisibilityLevel;
import co.cask.tephra.TxConstants;
import co.cask.tephra.hbase98.TransactionAwareHTable;
@@ -134,9 +135,14 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
return;
}
- boolean readOwnWrites = m.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null;
Map<String,byte[]> updateAttributes = m.getAttributesMap();
PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(c.getEnvironment(),updateAttributes);
+ if (m.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) == null) {
+ // Unless we're aborting the transaction, we do not want to see our own transaction writes,
+ // since index maintenance requires seeing the previously committed data in order to function
+ // properly.
+ indexMetaData.getTransaction().setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
+ }
Collection<Pair<Mutation, byte[]>> indexUpdates = null;
// get the current span, or just use a null-span to avoid a bunch of if statements
try (TraceScope scope = Trace.startSpan("Starting to build index updates")) {
@@ -146,7 +152,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
}
// get the index updates for all elements in this batch
- indexUpdates = getIndexUpdates(c.getEnvironment(), indexMetaData, getMutationIterator(miniBatchOp), readOwnWrites);
+ indexUpdates = getIndexUpdates(c.getEnvironment(), indexMetaData, getMutationIterator(miniBatchOp));
current.addTimelineAnnotation("Built index updates, doing preStep");
TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());
@@ -162,30 +168,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
}
}
- private static final String TX_NO_READ_OWN_WRITES = "TX_NO_READ_OWN_WRITES";
- @Override
- public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, RegionScanner s) {
- /*
- * TODO: remove once Tephra gives us a way to not read our own writes.
- * Hack to force scan not to read their own writes. Since the mutations have already been
- * applied by the time the preBatchMutate hook is called, we need to adjust the max time
- * range down by one to prevent us from seeing the current state. Instead, we need to
- * see the state right before our Puts have been applied.
- */
- byte[] encoded = scan.getAttribute(TX_NO_READ_OWN_WRITES);
- if (encoded != null) {
- TimeRange range = scan.getTimeRange();
- long maxTime = range.getMax();
- try {
- scan.setTimeRange(range.getMin(), maxTime == Long.MAX_VALUE ? maxTime : maxTime-1);
- } catch (IOException e1) {
- throw new RuntimeException(e1);
- }
- }
- return s;
- }
-
- private Collection<Pair<Mutation, byte[]>> getIndexUpdates(RegionCoprocessorEnvironment env, PhoenixIndexMetaData indexMetaData, Iterator<Mutation> mutationIterator, boolean readOwnWrites) throws IOException {
+ private Collection<Pair<Mutation, byte[]>> getIndexUpdates(RegionCoprocessorEnvironment env, PhoenixIndexMetaData indexMetaData, Iterator<Mutation> mutationIterator) throws IOException {
ResultScanner scanner = null;
TransactionAwareHTable txTable = null;
@@ -230,9 +213,6 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
keys.add(PVarbinary.INSTANCE.getKeyRange(ptr.copyBytesIfNecessary()));
}
Scan scan = new Scan();
- if (!readOwnWrites) {
- scan.setAttribute(TX_NO_READ_OWN_WRITES, PDataType.TRUE_BYTES); // TODO: remove when Tephra allows this
- }
// Project all mutable columns
for (ColumnReference ref : mutableColumns) {
scan.addColumn(ref.getFamily(), ref.getQualifier());