You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by fp...@apache.org on 2016/05/11 18:06:45 UTC
[38/50] [abbrv] incubator-omid git commit: Parallel execution example
Parallel execution example
Change-Id: Idd5cac8391845aa08293ea71104c7973caa6e357
Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/e95a12fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/e95a12fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/e95a12fc
Branch: refs/heads/master
Commit: e95a12fcd1d43be11db719965803b7a4c556105c
Parents: 41d2e23
Author: Igor Katkov <ka...@yahoo-inc.com>
Authored: Thu May 5 11:00:34 2016 -0700
Committer: Igor Katkov <ka...@yahoo-inc.com>
Committed: Fri May 6 09:33:16 2016 -0700
----------------------------------------------------------------------
examples/pom.xml | 6 +
examples/run.sh | 11 +-
.../apache/omid/examples/ParallelExecution.java | 94 ++++++++
.../apache/omid/examples/RowIdGenerator.java | 23 ++
.../omid/examples/SnapshotIsolationExample.java | 215 +++++++++++--------
pom.xml | 2 +-
6 files changed, 260 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/e95a12fc/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index 236e377..f0dc7ef 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -36,6 +36,12 @@
<version>${slf4j.version}</version>
</dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>apache-log4j-extras</artifactId>
+ <version>${log4j.version}</version>
+ </dependency>
+
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/e95a12fc/examples/run.sh
----------------------------------------------------------------------
diff --git a/examples/run.sh b/examples/run.sh
index 6785919..b9323c3 100755
--- a/examples/run.sh
+++ b/examples/run.sh
@@ -22,7 +22,7 @@ function show_help() {
# $ $0 <example> [ tablename ] [column family name]
#
# where:
- # - <example> can be [ basic | si | conf ]
+ # - <example> can be [ basic | si | conf | parallel ]
#
# Example execution:
# ------------------
@@ -84,13 +84,16 @@ USER_OPTION=$1
shift
case ${USER_OPTION} in
basic)
- java -cp $KLASSPATH org.apache.omid.examples.BasicExample "$@"
+ java -cp $KLASSPATH -Dlog4j.configuration=file:${SCRIPTDIR}/conf/log4j.properties org.apache.omid.examples.BasicExample "$@"
;;
si)
- java -cp $KLASSPATH org.apache.omid.examples.SnapshotIsolationExample "$@"
+ java -cp $KLASSPATH -Dlog4j.configuration=file:${SCRIPTDIR}/conf/log4j.properties org.apache.omid.examples.SnapshotIsolationExample "$@"
+ ;;
+ parallel)
+ java -cp $KLASSPATH -Dlog4j.configuration=file:${SCRIPTDIR}/conf/log4j.properties org.apache.omid.examples.ParallelExecution "$@"
;;
conf)
- java -cp $KLASSPATH org.apache.omid.examples.ConfigurationExample "$@"
+ java -cp $KLASSPATH -Dlog4j.configuration=file:${SCRIPTDIR}/conf/log4j.properties org.apache.omid.examples.ConfigurationExample "$@"
;;
*)
show_help
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/e95a12fc/examples/src/main/java/org/apache/omid/examples/ParallelExecution.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/omid/examples/ParallelExecution.java b/examples/src/main/java/org/apache/omid/examples/ParallelExecution.java
new file mode 100644
index 0000000..0f6a543
--- /dev/null
+++ b/examples/src/main/java/org/apache/omid/examples/ParallelExecution.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.examples;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.omid.transaction.RollbackException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Random;
+
+/**
+ * ****************************************************************************************************************
+ *
+ * Same as SnapshotIsolationExample only executes multiple transactions concurrently
+ *
+ * ****************************************************************************************************************
+ *
+ * Please @see{SnapshotIsolationExample}
+ */
+public class ParallelExecution {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ParallelExecution.class);
+ private static final long heartBeatInterval = 10_000;
+
+ public static void main(String[] args) throws Exception {
+
+ LOG.info("Parsing the command line arguments");
+ int maxThreads = Runtime.getRuntime().availableProcessors();
+ if (args != null && args.length > 2 && StringUtils.isNotEmpty(args[2])) {
+ maxThreads = Integer.parseInt(args[2]);
+ }
+ LOG.info("Execute '{}' concurrent threads", maxThreads);
+
+ for (int i = 0; i < maxThreads; i++) {
+ final SnapshotIsolationExample example = new SnapshotIsolationExample(args);
+ example.setRowIdGenerator(new RandomRowIdGenerator());
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ long lastHeartBeatTime = System.currentTimeMillis();
+ long counter = 0;
+ long errorCounter = 0;
+ while (true) {
+ LOG.info("New cycle starts");
+ try {
+ example.execute();
+ counter++;
+ } catch (IOException | RollbackException | IllegalStateException e) {
+ LOG.error("", e);
+ errorCounter++;
+ }
+ if (System.currentTimeMillis() > lastHeartBeatTime + heartBeatInterval) {
+ LOG.error(String.format("%s cycles executed, %s errors", counter, errorCounter));
+ lastHeartBeatTime = System.currentTimeMillis();
+ }
+ }
+ }
+ });
+ t.setName(String.format("SnapshotIsolationExample thread %s/%s", i + 1, maxThreads));
+ t.start();
+ }
+
+ }
+
+ private static class RandomRowIdGenerator implements RowIdGenerator {
+
+ private Random random = new Random();
+
+ @Override
+ public byte[] getRowId() {
+ return Bytes.toBytes("id" + random.nextInt());
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/e95a12fc/examples/src/main/java/org/apache/omid/examples/RowIdGenerator.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/omid/examples/RowIdGenerator.java b/examples/src/main/java/org/apache/omid/examples/RowIdGenerator.java
new file mode 100644
index 0000000..4e5c631
--- /dev/null
+++ b/examples/src/main/java/org/apache/omid/examples/RowIdGenerator.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.examples;
+
+interface RowIdGenerator {
+
+ byte[] getRowId();
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/e95a12fc/examples/src/main/java/org/apache/omid/examples/SnapshotIsolationExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/omid/examples/SnapshotIsolationExample.java b/examples/src/main/java/org/apache/omid/examples/SnapshotIsolationExample.java
index a68d81a..60b82da 100644
--- a/examples/src/main/java/org/apache/omid/examples/SnapshotIsolationExample.java
+++ b/examples/src/main/java/org/apache/omid/examples/SnapshotIsolationExample.java
@@ -17,33 +17,37 @@
*/
package org.apache.omid.examples;
-import org.apache.omid.transaction.HBaseTransactionManager;
-import org.apache.omid.transaction.RollbackException;
-import org.apache.omid.transaction.TTable;
-import org.apache.omid.transaction.Transaction;
-import org.apache.omid.transaction.TransactionManager;
+import com.google.common.base.Preconditions;
+
+import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.omid.transaction.HBaseTransactionManager;
+import org.apache.omid.transaction.RollbackException;
+import org.apache.omid.transaction.TTable;
+import org.apache.omid.transaction.Transaction;
+import org.apache.omid.transaction.TransactionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.Arrays;
/**
* ****************************************************************************************************************
*
- * Example code which demonstrates the preservation of Snapshot Isolation when writing shared data concurrently
+ * Example code which demonstrates the preservation of Snapshot Isolation when writing shared data concurrently
*
* ****************************************************************************************************************
*
* Please @see{BasicExample} first
*
* In the code below, two concurrent transactions (Tx1 & Tx2), try to update the same column in HBase. This will result
- * in the rollback of Tx2 -the last one trying to commit- due to conflicts in the writeset with the previously
- * committed transaction Tx1. Also shows how Tx2 reads the right values from its own snapshot in HBase data.
+ * in the rollback of Tx2 -the last one trying to commit- due to conflicts in the writeset with the previously committed
+ * transaction Tx1. Also shows how Tx2 reads the right values from its own snapshot in HBase data.
*
* After building the package with 'mvn clean package' find the resulting examples-<version>-bin.tar.gz file in the
* 'examples/target' folder. Copy it to the target host and expand with 'tar -zxvf examples-<version>-bin.tar.gz'.
@@ -52,8 +56,8 @@ import java.util.Arrays;
* command line arguments. If a secure HBase deployment is needed, use also command line arguments to specify the
* principal (user) and keytab file.
*
- * The example requires a user table to perform transactional read/write operations. A table is already specified in
- * the default configuration, and can be created with the following command using the 'hbase shell':
+ * The example requires a user table to perform transactional read/write operations. A table is already specified in the
+ * default configuration, and can be created with the following command using the 'hbase shell':
*
* <pre>
* create 'MY_TX_TABLE', {NAME => 'MY_CF', VERSIONS => '2147483647', TTL => '2147483647'}
@@ -74,101 +78,140 @@ import java.util.Arrays;
public class SnapshotIsolationExample {
private static final Logger LOG = LoggerFactory.getLogger(SnapshotIsolationExample.class);
+ private final byte[] qualifier;
+ private final byte[] initialData;
+ private final byte[] dataValue1;
+ private final byte[] dataValue2;
+ private RowIdGenerator rowIdGenerator = new StaticRowIdGenerator();
+ private String userTableName;
+ private byte[] family;
+ private TransactionManager tm;
+ private TTable txTable;
public static void main(String[] args) throws Exception {
+ SnapshotIsolationExample example = new SnapshotIsolationExample(args);
+ example.execute();
+ example.close();
+ }
+ SnapshotIsolationExample(String[] args) throws IOException, InterruptedException {
LOG.info("Parsing the command line arguments");
- String userTableName = "MY_TX_TABLE";
+ userTableName = "MY_TX_TABLE";
if (args != null && args.length > 0 && StringUtils.isNotEmpty(args[0])) {
userTableName = args[0];
}
- byte[] family = Bytes.toBytes("MY_CF");
+ family = Bytes.toBytes("MY_CF");
if (args != null && args.length > 1 && StringUtils.isNotEmpty(args[1])) {
family = Bytes.toBytes(args[1]);
}
LOG.info("Table '{}', column family '{}'", userTableName, Bytes.toString(family));
- byte[] exampleRow = Bytes.toBytes("EXAMPLE_ROW");
- byte[] qualifier = Bytes.toBytes("MY_Q");
- byte[] initialData = Bytes.toBytes("initialVal");
- byte[] dataValue1 = Bytes.toBytes("val1");
- byte[] dataValue2 = Bytes.toBytes("val2");
+ qualifier = Bytes.toBytes("MY_Q");
+ initialData = Bytes.toBytes("initialVal");
+ dataValue1 = Bytes.toBytes("val1");
+ dataValue2 = Bytes.toBytes("val2");
- LOG.info("--------------------------------------------------------------------------------------------------");
+ LOG.info("--------");
LOG.info("NOTE: All Transactions in the Example access column {}:{}/{}/{} [TABLE:ROW/CF/Q]",
- userTableName, Bytes.toString(exampleRow), Bytes.toString(family), Bytes.toString(qualifier));
- LOG.info("--------------------------------------------------------------------------------------------------");
+ userTableName, Bytes.toString(rowIdGenerator.getRowId()), Bytes.toString(family),
+ Bytes.toString(qualifier));
+ LOG.info("--------");
LOG.info("Creating access to Omid Transaction Manager & Transactional Table '{}'", userTableName);
- try (TransactionManager tm = HBaseTransactionManager.newInstance();
- TTable txTable = new TTable(userTableName))
- {
-
- // A transaction Tx0 sets an initial value to a particular column in an specific row
- Transaction tx0 = tm.begin();
- Put initialPut = new Put(exampleRow);
- initialPut.add(family, qualifier, initialData);
- txTable.put(tx0, initialPut);
- tm.commit(tx0);
- LOG.info("Initial Transaction {} COMMITTED. Base value written in {}:{}/{}/{} = {}",
- tx0, userTableName, Bytes.toString(exampleRow), Bytes.toString(family),
- Bytes.toString(qualifier), Bytes.toString(initialData));
-
- // Transaction Tx1 starts, creates its own snapshot of the current data in HBase and writes new data
- Transaction tx1 = tm.begin();
- LOG.info("Transaction {} STARTED", tx1);
- Put tx1Put = new Put(exampleRow);
- tx1Put.add(family, qualifier, dataValue1);
- txTable.put(tx1, tx1Put);
- LOG.info("Transaction {} updates base value in {}:{}/{}/{} = {} in its own Snapshot",
- tx1, userTableName, Bytes.toString(exampleRow), Bytes.toString(family),
- Bytes.toString(qualifier), Bytes.toString(dataValue1));
-
- // A concurrent transaction Tx2 starts, creates its own snapshot and reads the column value
- Transaction tx2 = tm.begin();
- LOG.info("Concurrent Transaction {} STARTED", tx2);
- Get tx2Get = new Get(exampleRow);
- tx2Get.addColumn(family, qualifier);
- // As Tx1 is not yet committed, it should read the value set by Tx0 not the value written by Tx1
- Result tx2GetResult = txTable.get(tx2, tx2Get);
- assert Arrays.equals(tx2GetResult.value(), initialData);
- LOG.info("Concurrent Transaction {} should read base value in {}:{}/{}/{} from its Snapshot | Value read = {}",
- tx2, userTableName, Bytes.toString(exampleRow), Bytes.toString(family),
- Bytes.toString(qualifier), Bytes.toString(tx2GetResult.value()));
-
- // Transaction Tx1 tries to commit and as there're no conflicting changes, persists the new value in HBase
- tm.commit(tx1);
- LOG.info("Transaction {} COMMITTED. New column value {}:{}/{}/{} = {}",
- tx1, userTableName, Bytes.toString(exampleRow), Bytes.toString(family),
- Bytes.toString(qualifier), Bytes.toString(dataValue1));
-
- // Tx2 reading again after Tx1 commit must read data from its snapshot...
- tx2Get = new Get(exampleRow);
- tx2Get.addColumn(family, qualifier);
- tx2GetResult = txTable.get(tx2, tx2Get);
- // ...so it must read the initial value written by Tx0
- LOG.info("Concurrent Transaction {} should read again base value in {}:{}/{}/{} from its Snapshot | Value read = {}",
- tx2, userTableName, Bytes.toString(exampleRow), Bytes.toString(family),
- Bytes.toString(qualifier), Bytes.toString(tx2GetResult.value()));
-
- // Tx2 tries to write the column written by the committed concurrent transaction Tx1...
- Put tx2Put = new Put(exampleRow);
- tx2Put.add(family, qualifier, dataValue2);
- txTable.put(tx2, tx2Put);
- LOG.info("Concurrent Transaction {} updates {}:{}/{}/{} = {} in its own Snapshot (Will conflict with {} at commit time)",
- tx2, userTableName, Bytes.toString(exampleRow), Bytes.toString(family),
- Bytes.toString(qualifier), Bytes.toString(dataValue1), tx1);
-
- // ... and when committing, Tx2 has to abort due to concurrent conflicts with committed transaction Tx1
- try {
- LOG.info("Concurrent Transaction {} TRYING TO COMMIT", tx2);
- tm.commit(tx2);
- } catch (RollbackException e) {
- LOG.error("Concurrent Transaction {} ROLLED-BACK!!! : {}", tx2, e.getMessage());
- }
+ tm = HBaseTransactionManager.newInstance();
+ txTable = new TTable(userTableName);
+ }
+ void execute() throws IOException, RollbackException {
+
+ // A transaction Tx0 sets an initial value to a particular column in an specific row
+ Transaction tx0 = tm.begin();
+ byte[] rowId = rowIdGenerator.getRowId();
+ Put initialPut = new Put(rowId);
+ initialPut.add(family, qualifier, initialData);
+ txTable.put(tx0, initialPut);
+ tm.commit(tx0);
+ LOG.info("Initial Transaction {} COMMITTED. Base value written in {}:{}/{}/{} = {}",
+ tx0, userTableName, Bytes.toString(rowId), Bytes.toString(family),
+ Bytes.toString(qualifier), Bytes.toString(initialData));
+
+ // Transaction Tx1 starts, creates its own snapshot of the current data in HBase and writes new data
+ Transaction tx1 = tm.begin();
+ LOG.info("Transaction {} STARTED", tx1);
+ Put tx1Put = new Put(rowId);
+ tx1Put.add(family, qualifier, dataValue1);
+ txTable.put(tx1, tx1Put);
+ LOG.info("Transaction {} updates base value in {}:{}/{}/{} = {} in its own Snapshot",
+ tx1, userTableName, Bytes.toString(rowId), Bytes.toString(family),
+ Bytes.toString(qualifier), Bytes.toString(dataValue1));
+
+ // A concurrent transaction Tx2 starts, creates its own snapshot and reads the column value
+ Transaction tx2 = tm.begin();
+ LOG.info("Concurrent Transaction {} STARTED", tx2);
+ Get tx2Get = new Get(rowId);
+ tx2Get.addColumn(family, qualifier);
+ Result tx2GetResult = txTable.get(tx2, tx2Get);
+ Preconditions.checkState(Arrays.equals(tx2GetResult.value(), initialData),
+ "As Tx1 is not yet committed, Tx2 should read the value set by Tx0 not the value written by Tx1");
+ LOG.info(
+ "Concurrent Transaction {} should read base value in {}:{}/{}/{} from its Snapshot | Value read = {}",
+ tx2, userTableName, Bytes.toString(rowId), Bytes.toString(family),
+ Bytes.toString(qualifier), Bytes.toString(tx2GetResult.value()));
+
+ // Transaction Tx1 tries to commit and as there're no conflicting changes, persists the new value in HBase
+ tm.commit(tx1);
+ LOG.info("Transaction {} COMMITTED. New column value {}:{}/{}/{} = {}",
+ tx1, userTableName, Bytes.toString(rowId), Bytes.toString(family),
+ Bytes.toString(qualifier), Bytes.toString(dataValue1));
+
+ // Tx2 reading again after Tx1 commit must read data from its snapshot...
+ tx2Get = new Get(rowId);
+ tx2Get.addColumn(family, qualifier);
+ tx2GetResult = txTable.get(tx2, tx2Get);
+ // ...so it must read the initial value written by Tx0
+ LOG.info(
+ "Concurrent Transaction {} should read again base value in {}:{}/{}/{} from its Snapshot | Value read = {}",
+ tx2, userTableName, Bytes.toString(rowId), Bytes.toString(family),
+ Bytes.toString(qualifier), Bytes.toString(tx2GetResult.value()));
+ Preconditions.checkState(Arrays.equals(tx2GetResult.value(), initialData),
+ "Tx2 must read the initial value written by Tx0");
+
+ // Tx2 tries to write the column written by the committed concurrent transaction Tx1...
+ Put tx2Put = new Put(rowId);
+ tx2Put.add(family, qualifier, dataValue2);
+ txTable.put(tx2, tx2Put);
+ LOG.info(
+ "Concurrent Transaction {} updates {}:{}/{}/{} = {} in its own Snapshot (Will conflict with {} at commit time)",
+ tx2, userTableName, Bytes.toString(rowId), Bytes.toString(family),
+ Bytes.toString(qualifier), Bytes.toString(dataValue1), tx1);
+
+ // ... and when committing, Tx2 has to abort due to concurrent conflicts with committed transaction Tx1
+ try {
+ LOG.info("Concurrent Transaction {} TRYING TO COMMIT", tx2);
+ tm.commit(tx2);
+ // should throw an exception
+ Preconditions.checkState(false, "Should have thrown RollbackException");
+ } catch (RollbackException e) {
+ LOG.info("Concurrent Transaction {} ROLLED-BACK : {}", tx2, e.getMessage());
}
+ }
+
+ private void close() throws IOException {
+ tm.close();
+ txTable.close();
+ }
+
+ void setRowIdGenerator(RowIdGenerator rowIdGenerator) {
+ this.rowIdGenerator = rowIdGenerator;
}
+ private class StaticRowIdGenerator implements RowIdGenerator {
+
+ @Override
+ public byte[] getRowId() {
+ return Bytes.toBytes("EXAMPLE_ROW");
+ }
+ }
}
+
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/e95a12fc/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 84881d5..32e0532 100644
--- a/pom.xml
+++ b/pom.xml
@@ -139,7 +139,7 @@
<guice.version>3.0</guice.version>
<testng.version>6.8.8</testng.version>
<slf4j.version>1.7.7</slf4j.version>
- <log4j.version>1.2.14</log4j.version>
+ <log4j.version>1.2.17</log4j.version>
<netty.version>3.2.6.Final</netty.version>
<protobuf.version>2.5.0</protobuf.version>
<mockito.version>1.9.5</mockito.version>