You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2015/06/05 01:46:20 UTC

phoenix git commit: Updating pom to use 0.5.1-SNAPSHOT and removing hack to prevent reading your own writes

Repository: phoenix
Updated Branches:
  refs/heads/txn 6f8f39436 -> 716307f41


Updating pom to use 0.5.1-SNAPSHOT and removing hack to prevent reading your own writes


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/716307f4
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/716307f4
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/716307f4

Branch: refs/heads/txn
Commit: 716307f414ac96fff925ed4e682428152a1dc32b
Parents: 6f8f394
Author: James Taylor <ja...@apache.org>
Authored: Thu Jun 4 16:46:04 2015 -0700
Committer: James Taylor <ja...@apache.org>
Committed: Thu Jun 4 16:46:04 2015 -0700

----------------------------------------------------------------------
 phoenix-core/pom.xml                            |  8 ++---
 .../end2end/index/BaseMutableIndexIT.java       | 16 ++++-----
 .../apache/phoenix/execute/MutationState.java   |  9 ++++-
 .../index/PhoenixTransactionalIndexer.java      | 38 +++++---------------
 4 files changed, 29 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/716307f4/phoenix-core/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index 3fcf339..7ca5d1e 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -224,24 +224,24 @@
     <dependency>
       <groupId>co.cask.tephra</groupId>
       <artifactId>tephra-api</artifactId>
-      <version>0.4.2-SNAPSHOT</version>
+      <version>0.5.1-SNAPSHOT</version>
     </dependency>
     <dependency>
       <groupId>co.cask.tephra</groupId>
       <artifactId>tephra-core</artifactId>
-      <version>0.4.2-SNAPSHOT</version>
+      <version>0.5.1-SNAPSHOT</version>
     </dependency>
     <dependency>
       <groupId>co.cask.tephra</groupId>
       <artifactId>tephra-core</artifactId>
       <type>test-jar</type>
-      <version>0.4.2-SNAPSHOT</version>
+      <version>0.5.1-SNAPSHOT</version>
       <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>co.cask.tephra</groupId>
       <artifactId>tephra-hbase-compat-0.98</artifactId>
-      <version>0.4.2-SNAPSHOT</version>
+      <version>0.5.1-SNAPSHOT</version>
     </dependency>
   
     <!-- Make sure we have all the antlr dependencies -->

http://git-wip-us.apache.org/repos/asf/phoenix/blob/716307f4/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseMutableIndexIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseMutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseMutableIndexIT.java
index 2676548..000787b 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseMutableIndexIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/BaseMutableIndexIT.java
@@ -1131,19 +1131,19 @@ public abstract class BaseMutableIndexIT extends BaseHBaseManagedTimeIT {
     		stmt.executeUpdate("upsert into DEMO values('cc1', 1.23, 'abc')");
     		conn.commit();
     		
-    		//assert values in index table 
-    		rs = stmt.executeQuery("select * from DEMO_IDX");
+    		//assert values in data table
+    		rs = stmt.executeQuery("select /*+ NO_INDEX */ v1, v2, v3 from DEMO");
     		assertTrue(rs.next());
-    		assertEquals(0, Doubles.compare(1.23, rs.getDouble(1)));
-    		assertEquals("cc1", rs.getString(2));
+    		assertEquals("cc1", rs.getString(1));
+    		assertEquals(0, Doubles.compare(1.23, rs.getDouble(2)));
     		assertEquals("abc", rs.getString(3));
     		assertFalse(rs.next());
     		
-    		//assert values in data table
-    		rs = stmt.executeQuery("select v1, v2, v3 from DEMO");
+    		//assert values in index table 
+    		rs = stmt.executeQuery("select * from DEMO_IDX");
     		assertTrue(rs.next());
-    		assertEquals("cc1", rs.getString(1));
-    		assertEquals(0, Doubles.compare(1.23, rs.getDouble(2)));
+    		assertEquals(0, Doubles.compare(1.23, rs.getDouble(1)));
+    		assertEquals("cc1", rs.getString(2));
     		assertEquals("abc", rs.getString(3));
     		assertFalse(rs.next());
     		

http://git-wip-us.apache.org/repos/asf/phoenix/blob/716307f4/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 256e487..562d055 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -510,10 +510,12 @@ public class MutationState implements SQLCloseable {
     private void send(Iterator<TableRef> tableRefIterator) throws SQLException {
         int i = 0;
         long[] serverTimeStamps = null;
+        boolean sendAll = false;
         // Validate up front if not transactional so that we 
         if (tableRefIterator == null) {
             serverTimeStamps = validateAll();
             tableRefIterator = mutations.keySet().iterator();
+            sendAll = true;
         }
 
         // add tracing for this operation
@@ -635,7 +637,12 @@ public class MutationState implements SQLCloseable {
             if (tableRef.getTable().getType() != PTableType.INDEX) {
                 numRows -= valuesMap.size();
             }
-            tableRefIterator.remove(); // Remove batches as we process them
+            // Remove batches as we process them
+            if (sendAll) {
+            	tableRefIterator.remove(); // Iterating through actual map in this case
+            } else {
+            	mutations.remove(tableRef);
+            }
         }
         trace.close();
         assert(numRows==0);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/716307f4/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
index 39f885b..3d5da43 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixTransactionalIndexer.java
@@ -20,6 +20,7 @@ import java.util.Map;
 import java.util.Set;
 
 import co.cask.tephra.Transaction;
+import co.cask.tephra.Transaction.VisibilityLevel;
 import co.cask.tephra.TxConstants;
 import co.cask.tephra.hbase98.TransactionAwareHTable;
 
@@ -134,9 +135,14 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
             return;
         }
 
-        boolean readOwnWrites = m.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) != null;
         Map<String,byte[]> updateAttributes = m.getAttributesMap();
         PhoenixIndexMetaData indexMetaData = new PhoenixIndexMetaData(c.getEnvironment(),updateAttributes);
+        if (m.getAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY) == null) {
+        	// Unless we're aborting the transaction, we do not want to see our own transaction writes,
+        	// since index maintenance requires seeing the previously committed data in order to function
+        	// properly.
+        	indexMetaData.getTransaction().setVisibility(VisibilityLevel.SNAPSHOT_EXCLUDE_CURRENT);
+        }
         Collection<Pair<Mutation, byte[]>> indexUpdates = null;
         // get the current span, or just use a null-span to avoid a bunch of if statements
         try (TraceScope scope = Trace.startSpan("Starting to build index updates")) {
@@ -146,7 +152,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
             }
 
             // get the index updates for all elements in this batch
-            indexUpdates = getIndexUpdates(c.getEnvironment(), indexMetaData, getMutationIterator(miniBatchOp), readOwnWrites);
+            indexUpdates = getIndexUpdates(c.getEnvironment(), indexMetaData, getMutationIterator(miniBatchOp));
 
             current.addTimelineAnnotation("Built index updates, doing preStep");
             TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());
@@ -162,30 +168,7 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
         }
     }
 
-    private static final String TX_NO_READ_OWN_WRITES = "TX_NO_READ_OWN_WRITES";
-    @Override
-    public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, RegionScanner s) {
-        /*
-         * TODO: remove once Tephra gives us a way to not read our own writes.
-         *  Hack to force scan not to read their own writes. Since the mutations have already been
-         *  applied by the time the preBatchMutate hook is called, we need to adjust the max time
-         *  range down by one to prevent us from seeing the current state. Instead, we need to
-         *  see the state right before our Puts have been applied.
-         */
-        byte[] encoded = scan.getAttribute(TX_NO_READ_OWN_WRITES);
-        if (encoded != null) {
-            TimeRange range = scan.getTimeRange();
-            long maxTime = range.getMax();
-            try {
-                scan.setTimeRange(range.getMin(), maxTime == Long.MAX_VALUE ? maxTime : maxTime-1);
-            } catch (IOException e1) {
-                throw new RuntimeException(e1);
-            }
-        }
-        return s;
-    }
-
-    private Collection<Pair<Mutation, byte[]>> getIndexUpdates(RegionCoprocessorEnvironment env, PhoenixIndexMetaData indexMetaData, Iterator<Mutation> mutationIterator, boolean readOwnWrites) throws IOException {
+    private Collection<Pair<Mutation, byte[]>> getIndexUpdates(RegionCoprocessorEnvironment env, PhoenixIndexMetaData indexMetaData, Iterator<Mutation> mutationIterator) throws IOException {
         ResultScanner scanner = null;
         TransactionAwareHTable txTable = null;
         
@@ -230,9 +213,6 @@ public class PhoenixTransactionalIndexer extends BaseRegionObserver {
                     keys.add(PVarbinary.INSTANCE.getKeyRange(ptr.copyBytesIfNecessary()));
                 }
                 Scan scan = new Scan();
-                if (!readOwnWrites) {
-                    scan.setAttribute(TX_NO_READ_OWN_WRITES, PDataType.TRUE_BYTES); // TODO: remove when Tephra allows this
-                }
                 // Project all mutable columns
                 for (ColumnReference ref : mutableColumns) {
                     scan.addColumn(ref.getFamily(), ref.getQualifier());