You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by oh...@apache.org on 2017/07/10 06:48:15 UTC

incubator-omid git commit: [OMID-71] Omid's transaction manager detects conflicts at the cell level. Meaning that two concurrent transactions write to different columns at the same row do not conflict. This semantics is not suitable for Apache Phoenix

Repository: incubator-omid
Updated Branches:
  refs/heads/master 0c3713617 -> 0005b3603


[OMID-71] Omid's transaction manager detects conflicts at the
 cell level. Meaning that two concurrent transactions write to different
 columns at the same row do not conflict. This semantics is not suitable for
 Apache Phoenix which requires conflict detection at the row level. This
 commit augments Omid with row level conflict analysis.


Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/0005b360
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/0005b360
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/0005b360

Branch: refs/heads/master
Commit: 0005b3603571b28e01e5fbd8943e23efc5ead781
Parents: 0c37136
Author: Ohad Shacham <oh...@yahoo-inc.com>
Authored: Wed Jun 28 09:20:33 2017 +0300
Committer: Ohad Shacham <oh...@yahoo-inc.com>
Committed: Wed Jun 28 09:20:33 2017 +0300

----------------------------------------------------------------------
 .../apache/omid/transaction/HBaseCellId.java    |  15 +-
 .../HBaseOmidClientConfiguration.java           |  10 +
 .../java/org/apache/omid/tso/client/CellId.java |   1 +
 .../tso/client/OmidClientConfiguration.java     |  12 ++
 .../org/apache/omid/tso/client/TSOClient.java   |  35 +++-
 .../org/apache/omid/tso/client/TSOProtocol.java |   2 +
 .../apache/omid/tso/util/DummyCellIdImpl.java   |  10 +
 .../src/main/resources/omid-client-config.yml   |   6 +-
 .../TestTSOClientRowAndCellLevelConflict.java   | 203 +++++++++++++++++++
 9 files changed, 291 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/0005b360/hbase-client/src/main/java/org/apache/omid/transaction/HBaseCellId.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseCellId.java b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseCellId.java
index 8d0641b..582bfbd 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseCellId.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseCellId.java
@@ -17,7 +17,9 @@
  */
 package org.apache.omid.transaction;
 
+import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
+
 import org.apache.omid.tso.client.CellId;
 import org.apache.hadoop.hbase.client.HTableInterface;
 
@@ -67,9 +69,13 @@ public class HBaseCellId implements CellId {
                 + ":" + timestamp;
     }
 
+    private Hasher getHasher() {
+        return Hashing.murmur3_128().newHasher();
+    }
+
     @Override
     public long getCellId() {
-        return Hashing.murmur3_128().newHasher()
+        return getHasher()
                 .putBytes(table.getTableName())
                 .putBytes(row)
                 .putBytes(family)
@@ -77,4 +83,11 @@ public class HBaseCellId implements CellId {
                 .hash().asLong();
     }
 
+    @Override
+    public long getRowId() {
+        return getHasher()
+                .putBytes(table.getTableName())
+                .putBytes(row)
+                .hash().asLong();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/0005b360/hbase-client/src/main/java/org/apache/omid/transaction/HBaseOmidClientConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseOmidClientConfiguration.java b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseOmidClientConfiguration.java
index f5a1823..d945688 100644
--- a/hbase-client/src/main/java/org/apache/omid/transaction/HBaseOmidClientConfiguration.java
+++ b/hbase-client/src/main/java/org/apache/omid/transaction/HBaseOmidClientConfiguration.java
@@ -20,9 +20,11 @@ package org.apache.omid.transaction;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.inject.Inject;
 import com.google.inject.name.Named;
+
 import org.apache.omid.YAMLUtils;
 import org.apache.omid.metrics.MetricsRegistry;
 import org.apache.omid.tools.hbase.SecureHBaseConfig;
+import org.apache.omid.tso.client.OmidClientConfiguration.ConflictDetectionLevel;
 import org.apache.omid.tso.client.OmidClientConfiguration.PostCommitMode;
 import org.apache.omid.tso.client.OmidClientConfiguration;
 import org.apache.hadoop.conf.Configuration;
@@ -73,6 +75,14 @@ public class HBaseOmidClientConfiguration extends SecureHBaseConfig {
         omidClientConfiguration.setPostCommitMode(postCommitMode);
     }
 
+    public ConflictDetectionLevel getConflictAnalysisLevel() {
+        return omidClientConfiguration.getConflictAnalysisLevel();
+    }
+
+    public void setConflictAnalysisLevel(ConflictDetectionLevel conflictAnalysisLevel) {
+        omidClientConfiguration.setConflictAnalysisLevel(conflictAnalysisLevel);
+    }
+
     public String getCommitTableName() {
         return commitTableName;
     }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/0005b360/transaction-client/src/main/java/org/apache/omid/tso/client/CellId.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/CellId.java b/transaction-client/src/main/java/org/apache/omid/tso/client/CellId.java
index e40105e..962faf9 100644
--- a/transaction-client/src/main/java/org/apache/omid/tso/client/CellId.java
+++ b/transaction-client/src/main/java/org/apache/omid/tso/client/CellId.java
@@ -20,5 +20,6 @@ package org.apache.omid.tso.client;
 public interface CellId {
 
     long getCellId();
+    long getRowId();
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/0005b360/transaction-client/src/main/java/org/apache/omid/tso/client/OmidClientConfiguration.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/OmidClientConfiguration.java b/transaction-client/src/main/java/org/apache/omid/tso/client/OmidClientConfiguration.java
index 3542c55..6bc6481 100644
--- a/transaction-client/src/main/java/org/apache/omid/tso/client/OmidClientConfiguration.java
+++ b/transaction-client/src/main/java/org/apache/omid/tso/client/OmidClientConfiguration.java
@@ -32,6 +32,8 @@ public class OmidClientConfiguration {
 
     public enum PostCommitMode {SYNC, ASYNC}
 
+    public enum ConflictDetectionLevel {CELL, ROW}
+
     // Basic connection related params
 
     private ConnType connectionType = ConnType.DIRECT;
@@ -51,6 +53,7 @@ public class OmidClientConfiguration {
     // Transaction Manager related params
 
     private PostCommitMode postCommitMode = PostCommitMode.SYNC;
+    private ConflictDetectionLevel conflictAnalysisLevel = ConflictDetectionLevel.CELL;
 
     // ----------------------------------------------------------------------------------------------------------------
     // Instantiation
@@ -174,4 +177,13 @@ public class OmidClientConfiguration {
         this.postCommitMode = postCommitMode;
     }
 
+    public ConflictDetectionLevel getConflictAnalysisLevel() {
+        return conflictAnalysisLevel;
+    }
+
+    @Inject(optional = true)
+    @Named("omid.tm.conflictAnalysisLevel")
+    public void setConflictAnalysisLevel(ConflictDetectionLevel conflictAnalysisLevel) {
+        this.conflictAnalysisLevel = conflictAnalysisLevel;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/0005b360/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
index 1690ca6..12e3d13 100644
--- a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
+++ b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOClient.java
@@ -21,7 +21,10 @@ import com.google.common.base.Charsets;
 import com.google.common.net.HostAndPort;
 import com.google.common.util.concurrent.AbstractFuture;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import org.apache.omid.proto.TSOProto;
+import org.apache.omid.transaction.TransactionException;
+import org.apache.omid.tso.client.OmidClientConfiguration.ConflictDetectionLevel;
 import org.apache.omid.zk.ZKUtils;
 import org.apache.statemachine.StateMachine;
 import org.apache.curator.framework.CuratorFramework;
@@ -54,6 +57,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayDeque;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Queue;
@@ -63,6 +67,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+
 /**
  * Describes the abstract methods to communicate to the TSO server
  */
@@ -92,6 +97,10 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
     private InetSocketAddress tsoAddr;
     private String zkCurrentTsoPath;
 
+    // Conflict detection level of the entire system. Can either be Row or Cell level.
+    private ConflictDetectionLevel conflictDetectionLevel;
+    private Set<Long> rowLevelWriteSet;
+
     // ----------------------------------------------------------------------------------------------------------------
     // Construction
     // ----------------------------------------------------------------------------------------------------------------
@@ -159,6 +168,9 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
         bootstrap.setOption("keepAlive", true);
         bootstrap.setOption("reuseAddress", true);
         bootstrap.setOption("connectTimeoutMillis", 100);
+
+        conflictDetectionLevel = omidConf.getConflictAnalysisLevel();
+        rowLevelWriteSet = new HashSet<Long>();
     }
 
     // ----------------------------------------------------------------------------------------------------------------
@@ -186,8 +198,29 @@ public class TSOClient implements TSOProtocol, NodeCacheListener {
         TSOProto.Request.Builder builder = TSOProto.Request.newBuilder();
         TSOProto.CommitRequest.Builder commitbuilder = TSOProto.CommitRequest.newBuilder();
         commitbuilder.setStartTimestamp(transactionId);
+
+        rowLevelWriteSet.clear();
         for (CellId cell : cells) {
-            commitbuilder.addCellId(cell.getCellId());
+            long id;
+
+            switch (conflictDetectionLevel) {
+            case ROW:
+                id = cell.getRowId();
+                if (rowLevelWriteSet.contains(id)) {
+                    continue;
+                } else {
+                    rowLevelWriteSet.add(id);
+                }
+                break;
+            case CELL:
+                id = cell.getCellId();
+                break;
+            default:
+                id = 0;
+                assert (false);
+            }
+
+            commitbuilder.addCellId(id);
         }
         builder.setCommitRequest(commitbuilder.build());
         RequestEvent request = new RequestEvent(builder.build(), requestMaxRetries);

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/0005b360/transaction-client/src/main/java/org/apache/omid/tso/client/TSOProtocol.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOProtocol.java b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOProtocol.java
index 198913a..cb51ff2 100644
--- a/transaction-client/src/main/java/org/apache/omid/tso/client/TSOProtocol.java
+++ b/transaction-client/src/main/java/org/apache/omid/tso/client/TSOProtocol.java
@@ -19,6 +19,8 @@ package org.apache.omid.tso.client;
 
 import java.util.Set;
 
+import org.apache.omid.tso.client.OmidClientConfiguration.ConflictDetectionLevel;
+
 /**
  * Defines the protocol used on the client side to abstract communication to the TSO server
  */

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/0005b360/transaction-client/src/main/java/org/apache/omid/tso/util/DummyCellIdImpl.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/org/apache/omid/tso/util/DummyCellIdImpl.java b/transaction-client/src/main/java/org/apache/omid/tso/util/DummyCellIdImpl.java
index 4556757..f6fe9ad 100644
--- a/transaction-client/src/main/java/org/apache/omid/tso/util/DummyCellIdImpl.java
+++ b/transaction-client/src/main/java/org/apache/omid/tso/util/DummyCellIdImpl.java
@@ -22,9 +22,15 @@ import org.apache.omid.tso.client.CellId;
 public class DummyCellIdImpl implements CellId {
 
     private final long cellId;
+    private final long rowId;
 
     public DummyCellIdImpl(long cellId) {
+        this(cellId, cellId);
+    }
+
+    public DummyCellIdImpl(long cellId, long rowId) {
         this.cellId = cellId;
+        this.rowId = rowId;
     }
 
     @Override
@@ -32,4 +38,8 @@ public class DummyCellIdImpl implements CellId {
         return cellId;
     }
 
+    @Override
+    public long getRowId() {
+        return rowId;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/0005b360/transaction-client/src/main/resources/omid-client-config.yml
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/resources/omid-client-config.yml b/transaction-client/src/main/resources/omid-client-config.yml
index 4263c35..478bd48 100644
--- a/transaction-client/src/main/resources/omid-client-config.yml
+++ b/transaction-client/src/main/resources/omid-client-config.yml
@@ -36,4 +36,8 @@ executorThreads: 3
 
 # Configure whether the TM performs the post-commit actions for a tx (update shadow cells and clean commit table entry)
 # before returning to the control to the client (SYNC) or in parallel (ASYNC)
-postCommitMode: !!org.apache.omid.tso.client.OmidClientConfiguration$PostCommitMode SYNC
\ No newline at end of file
+postCommitMode: !!org.apache.omid.tso.client.OmidClientConfiguration$PostCommitMode SYNC
+
+# Conflict analysis level
+# Can either be cell level or row level. Default is cell level
+conflictDetectionLevel: !!org.apache.omid.tso.client.OmidClientConfiguration$ConflictDetectionLevel CELL

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/0005b360/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRowAndCellLevelConflict.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRowAndCellLevelConflict.java b/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRowAndCellLevelConflict.java
new file mode 100644
index 0000000..da655a3
--- /dev/null
+++ b/tso-server/src/test/java/org/apache/omid/tso/client/TestTSOClientRowAndCellLevelConflict.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.tso.client;
+
+import com.google.common.collect.Sets;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+
+import org.apache.omid.TestUtils;
+import org.apache.omid.tso.TSOMockModule;
+import org.apache.omid.tso.TSOServer;
+import org.apache.omid.tso.TSOServerConfig;
+import org.apache.omid.tso.client.OmidClientConfiguration.ConflictDetectionLevel;
+import org.apache.omid.tso.util.DummyCellIdImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+public class TestTSOClientRowAndCellLevelConflict {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TestTSOClientRowAndCellLevelConflict.class);
+
+    private static final String TSO_SERVER_HOST = "localhost";
+    private static final int TSO_SERVER_PORT = 5678;
+
+    private OmidClientConfiguration tsoClientConf;
+
+    // Required infrastructure for TSOClient test
+    private TSOServer tsoServer;
+
+    @BeforeMethod
+    public void beforeMethod() throws Exception {
+
+        TSOServerConfig tsoConfig = new TSOServerConfig();
+        tsoConfig.setConflictMapSize(1000);
+        tsoConfig.setPort(TSO_SERVER_PORT);
+        tsoConfig.setNumConcurrentCTWriters(2);
+        Module tsoServerMockModule = new TSOMockModule(tsoConfig);
+        Injector injector = Guice.createInjector(tsoServerMockModule);
+
+        LOG.info("==================================================================================================");
+        LOG.info("======================================= Init TSO Server ==========================================");
+        LOG.info("==================================================================================================");
+
+        tsoServer = injector.getInstance(TSOServer.class);
+        tsoServer.startAndWait();
+        TestUtils.waitForSocketListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 100);
+
+        LOG.info("==================================================================================================");
+        LOG.info("===================================== TSO Server Initialized =====================================");
+        LOG.info("==================================================================================================");
+
+        OmidClientConfiguration tsoClientConf = new OmidClientConfiguration();
+        tsoClientConf.setConnectionString(TSO_SERVER_HOST + ":" + TSO_SERVER_PORT);
+
+        this.tsoClientConf = tsoClientConf;
+
+    }
+
+    @AfterMethod
+    public void afterMethod() throws Exception {
+        tsoServer.stopAndWait();
+        tsoServer = null;
+        TestUtils.waitForSocketNotListening(TSO_SERVER_HOST, TSO_SERVER_PORT, 1000);
+    }
+
+    @Test(timeOut = 30_000)
+    public void testRowLevelConflictAnalysisConflict() throws Exception {
+
+        tsoClientConf.setConflictAnalysisLevel(ConflictDetectionLevel.ROW);
+
+        TSOClient client = TSOClient.newInstance(tsoClientConf);
+
+        CellId c1 = new DummyCellIdImpl(0xdeadbeefL, 0xdeadbeeeL);
+        CellId c2 = new DummyCellIdImpl(0xfeedcafeL, 0xdeadbeeeL);
+
+        Set<CellId> testWriteSet1 = Sets.newHashSet(c1);
+        Set<CellId> testWriteSet2 = Sets.newHashSet(c2);
+        
+        long ts1 = client.getNewStartTimestamp().get();
+        long ts2 = client.getNewStartTimestamp().get();
+        
+        client.commit(ts1, testWriteSet1).get();
+
+        try {
+            client.commit(ts2, testWriteSet2).get();
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof AbortException, "Transaction should be aborted");
+            return;
+        }
+
+        assertTrue(false, "Transaction should be aborted");
+    }
+
+    @Test(timeOut = 30_000)
+    public void testRowLevelConflictAnalysisCommit() throws Exception {
+
+        tsoClientConf.setConflictAnalysisLevel(ConflictDetectionLevel.ROW);
+
+        TSOClient client = TSOClient.newInstance(tsoClientConf);
+
+        CellId c1 = new DummyCellIdImpl(0xdeadbeefL, 0xdeadbeeeL);
+        CellId c2 = new DummyCellIdImpl(0xfeedcafeL, 0xdeadbeefL);
+
+        Set<CellId> testWriteSet1 = Sets.newHashSet(c1);
+        Set<CellId> testWriteSet2 = Sets.newHashSet(c2);
+        
+        long ts1 = client.getNewStartTimestamp().get();
+        long ts2 = client.getNewStartTimestamp().get();
+        
+        client.commit(ts1, testWriteSet1).get();
+
+        try {
+            client.commit(ts2, testWriteSet2).get();
+        } catch (ExecutionException e) {
+            assertFalse(e.getCause() instanceof AbortException, "Transaction should be committed");
+            return;
+        }
+
+        assertTrue(true, "Transaction should be committed");
+    }
+
+    @Test(timeOut = 30_000)
+    public void testCellLevelConflictAnalysisConflict() throws Exception {
+
+        tsoClientConf.setConflictAnalysisLevel(ConflictDetectionLevel.CELL);
+
+        TSOClient client = TSOClient.newInstance(tsoClientConf);
+
+        CellId c1 = new DummyCellIdImpl(0xdeadbeefL, 0xdeadbeeeL);
+        CellId c2 = new DummyCellIdImpl(0xdeadbeefL, 0xdeadbeeeL);
+
+        Set<CellId> testWriteSet1 = Sets.newHashSet(c1);
+        Set<CellId> testWriteSet2 = Sets.newHashSet(c2);
+        
+        long ts1 = client.getNewStartTimestamp().get();
+        long ts2 = client.getNewStartTimestamp().get();
+        
+        client.commit(ts1, testWriteSet1).get();
+
+        try {
+            client.commit(ts2, testWriteSet2).get();
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof AbortException, "Transaction should be aborted");
+            return;
+        }
+
+        assertTrue(false, "Transaction should be aborted");
+    }
+
+    @Test(timeOut = 30_000)
+    public void testCellLevelConflictAnalysisCommit() throws Exception {
+
+        tsoClientConf.setConflictAnalysisLevel(ConflictDetectionLevel.CELL);
+
+        TSOClient client = TSOClient.newInstance(tsoClientConf);
+
+        CellId c1 = new DummyCellIdImpl(0xdeadbeefL, 0xdeadbeeeL);
+        CellId c2 = new DummyCellIdImpl(0xfeedcafeL, 0xdeadbeefL);
+
+        Set<CellId> testWriteSet1 = Sets.newHashSet(c1);
+        Set<CellId> testWriteSet2 = Sets.newHashSet(c2);
+        
+        long ts1 = client.getNewStartTimestamp().get();
+        long ts2 = client.getNewStartTimestamp().get();
+        
+        client.commit(ts1, testWriteSet1).get();
+
+        try {
+            client.commit(ts2, testWriteSet2).get();
+        } catch (ExecutionException e) {
+            assertFalse(e.getCause() instanceof AbortException, "Transaction should be committed");
+            return;
+        }
+
+        assertTrue(true, "Transaction should be committed");
+    }
+    
+}