You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by oh...@apache.org on 2018/09/17 06:49:41 UTC
[1/2] incubator-omid git commit: [OMID-110] - support hbase-2
Repository: incubator-omid
Updated Branches:
refs/heads/phoenix-integration 75dc81775 -> 846e5a587
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/hbase-shims/hbase-2/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-shims/hbase-2/pom.xml b/hbase-shims/hbase-2/pom.xml
new file mode 100644
index 0000000..1883915
--- /dev/null
+++ b/hbase-shims/hbase-2/pom.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>omid-shims-aggregator</artifactId>
+ <groupId>org.apache.omid</groupId>
+ <version>0.8.2.11-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>omid-hbase2-shims</artifactId>
+ <name>Shims layer for HBase 2.x</name>
+ <packaging>jar</packaging>
+
+ <properties>
+ <hbase.version>${hbase2.version}</hbase.version>
+ </properties>
+
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-endpoint</artifactId>
+ <version>${hbase2.version}</version>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/hbase-shims/hbase-2/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
----------------------------------------------------------------------
diff --git a/hbase-shims/hbase-2/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java b/hbase-shims/hbase-2/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
new file mode 100644
index 0000000..961a11c
--- /dev/null
+++ b/hbase-shims/hbase-2/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import java.io.IOException;
+
+
+
+public abstract class BaseRegionObserver implements RegionObserver, RegionCoprocessor {
+ public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c,
+ Store store,
+ InternalScanner scanner,
+ ScanType scanType,
+ CompactionLifeCycleTracker tracker,
+ CompactionRequest request) throws IOException {
+ return preCompact(c,store,scanner,scanType,request);
+ }
+
+ public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> env,
+ Store store,
+ InternalScanner scanner,
+ ScanType scanType,
+ CompactionRequest request) throws IOException {
+ return scanner;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/hbase-shims/hbase-2/src/main/java/org/apache/omid/HBaseShims.java
----------------------------------------------------------------------
diff --git a/hbase-shims/hbase-2/src/main/java/org/apache/omid/HBaseShims.java b/hbase-shims/hbase-2/src/main/java/org/apache/omid/HBaseShims.java
new file mode 100644
index 0000000..3391649
--- /dev/null
+++ b/hbase-shims/hbase-2/src/main/java/org/apache/omid/HBaseShims.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid;
+
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+
+public class HBaseShims {
+
+ static public void setKeyValueSequenceId(KeyValue kv, int sequenceId) {
+
+ kv.setSequenceId(sequenceId);
+
+ }
+
+ static public Region getRegionCoprocessorRegion(RegionCoprocessorEnvironment env) {
+
+ return env.getRegion();
+
+ }
+
+ static public void flushAllOnlineRegions(HRegionServer regionServer, TableName tableName)
+ throws IOException {
+
+ for (HRegion r : regionServer.getRegions(tableName)) {
+ r.flush(true);
+ }
+
+ }
+
+ static public void addFamilyToHTableDescriptor(HTableDescriptor tableDesc, HColumnDescriptor columnDesc) {
+
+ tableDesc.addFamily(columnDesc);
+
+ }
+
+ public static CellComparator cellComparatorInstance() {
+ return CellComparator.getInstance();
+ }
+
+ public static boolean OmidCompactionEnabled(ObserverContext<RegionCoprocessorEnvironment> env,
+ Store store,
+ String cfFlagValue) {
+ TableDescriptor desc = env.getEnvironment().getRegion().getTableDescriptor();
+ ColumnFamilyDescriptor famDesc = desc.getColumnFamily(Bytes.toBytes(store.getColumnFamilyName()));
+ return Boolean.valueOf(Bytes.toString(famDesc.getValue(Bytes.toBytes(cfFlagValue))));
+ }
+
+ public static void setCompaction(Connection conn, TableName table, byte[] columnFamily, String key, String value)
+ throws IOException {
+ try(Admin admin = conn.getAdmin()) {
+ TableDescriptor desc = admin.getDescriptor(table);
+ ColumnFamilyDescriptor cfDesc = desc.getColumnFamily(columnFamily);
+ ColumnFamilyDescriptorBuilder cfBuilder = ColumnFamilyDescriptorBuilder.newBuilder(cfDesc);
+ cfBuilder.setValue(Bytes.toBytes(key),Bytes.toBytes(value));
+ admin.modifyColumnFamily(table, cfBuilder.build());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/hbase-shims/hbase-2/src/main/java/org/apache/omid/OmidBaseFilter.java
----------------------------------------------------------------------
diff --git a/hbase-shims/hbase-2/src/main/java/org/apache/omid/OmidBaseFilter.java b/hbase-shims/hbase-2/src/main/java/org/apache/omid/OmidBaseFilter.java
new file mode 100644
index 0000000..77494c6
--- /dev/null
+++ b/hbase-shims/hbase-2/src/main/java/org/apache/omid/OmidBaseFilter.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid;
+import org.apache.hadoop.hbase.filter.FilterBase;
+
+public abstract class OmidBaseFilter extends FilterBase {}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/hbase-shims/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-shims/pom.xml b/hbase-shims/pom.xml
index 8c420c1..d4f9963 100644
--- a/hbase-shims/pom.xml
+++ b/hbase-shims/pom.xml
@@ -13,8 +13,8 @@
<name>Shims Aggregator for HBase</name>
<modules>
- <module>hbase-0</module>
<module>hbase-1</module>
+ <module>hbase-2</module>
</modules>
<dependencies>
@@ -42,6 +42,16 @@
<!-- end storage related -->
+ <!-- testing -->
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-testing-util</artifactId>
+ </dependency>
+
+
+
+ <!-- end testing related -->
+
<!-- distributed comm -->
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/hbase-tools/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-tools/pom.xml b/hbase-tools/pom.xml
index 686551c..086e108 100644
--- a/hbase-tools/pom.xml
+++ b/hbase-tools/pom.xml
@@ -94,24 +94,26 @@
</build>
<profiles>
-
<profile>
- <id>hbase-0</id>
+ <id>hbase-1</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
<dependencies>
<dependency>
<groupId>org.apache.omid</groupId>
- <artifactId>omid-hbase0-shims</artifactId>
+ <artifactId>omid-hbase1-shims</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
- <id>hbase-1</id>
+ <id>hbase-2</id>
<dependencies>
<dependency>
<groupId>org.apache.omid</groupId>
- <artifactId>omid-hbase1-shims</artifactId>
+ <artifactId>omid-hbase2-shims</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/hbase-tools/src/test/java/org/apache/omid/tools/hbase/TestOmidTableManager.java
----------------------------------------------------------------------
diff --git a/hbase-tools/src/test/java/org/apache/omid/tools/hbase/TestOmidTableManager.java b/hbase-tools/src/test/java/org/apache/omid/tools/hbase/TestOmidTableManager.java
index ac89cfa..da04b12 100644
--- a/hbase-tools/src/test/java/org/apache/omid/tools/hbase/TestOmidTableManager.java
+++ b/hbase-tools/src/test/java/org/apache/omid/tools/hbase/TestOmidTableManager.java
@@ -43,7 +43,7 @@ public class TestOmidTableManager {
public void setUpClass() throws Exception {
// HBase setup
hbaseConf = HBaseConfiguration.create();
-
+ hbaseConf.setBoolean("hbase.localcluster.assign.random.ports",true);
hBaseTestUtil = new HBaseTestingUtility(hbaseConf);
hBaseTestUtil.startMiniCluster(1);
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0a1910e..da70b8e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -131,11 +131,14 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!-- Java Version -->
- <java.version>1.7</java.version>
+ <java.version>1.8</java.version>
<!-- 3rd-Party Library Versioning -->
<hbase0.version>0.98.10.1-hadoop1</hbase0.version>
<hbase1.version>1.2.5</hbase1.version>
+ <hbase2.version>2.0.1</hbase2.version>
+ <hadoop1.version>2.7.5</hadoop1.version>
+ <hadoop2.version>3.0.0</hadoop2.version>
<guava.version>14.0.1</guava.version>
<guice.version>3.0</guice.version>
<testng.version>6.10</testng.version>
@@ -538,21 +541,22 @@
</build>
</profile>
-
<profile>
- <id>hbase-0</id>
+ <id>hbase-1</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
- <hbase.version>${hbase0.version}</hbase.version>
+ <hadoop.version>${hadoop1.version}</hadoop.version>
+ <hbase.version>${hbase1.version}</hbase.version>
</properties>
</profile>
<profile>
- <id>hbase-1</id>
+ <id>hbase-2</id>
<properties>
- <hbase.version>${hbase1.version}</hbase.version>
+ <hadoop.version>${hadoop2.version}</hadoop.version>
+ <hbase.version>${hbase2.version}</hbase.version>
</properties>
</profile>
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/timestamp-storage/pom.xml
----------------------------------------------------------------------
diff --git a/timestamp-storage/pom.xml b/timestamp-storage/pom.xml
index 30a8571..a41343d 100644
--- a/timestamp-storage/pom.xml
+++ b/timestamp-storage/pom.xml
@@ -126,22 +126,22 @@
<profiles>
<profile>
- <id>hbase-0</id>
+ <id>hbase-1</id>
<dependencies>
<dependency>
<groupId>org.apache.omid</groupId>
- <artifactId>omid-hbase0-shims</artifactId>
+ <artifactId>omid-hbase1-shims</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
- <id>hbase-1</id>
+ <id>hbase-2</id>
<dependencies>
<dependency>
<groupId>org.apache.omid</groupId>
- <artifactId>omid-hbase1-shims</artifactId>
+ <artifactId>omid-hbase2-shims</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/timestamp-storage/src/test/java/org/apache/omid/timestamp/storage/TestHBaseTimestampStorage.java
----------------------------------------------------------------------
diff --git a/timestamp-storage/src/test/java/org/apache/omid/timestamp/storage/TestHBaseTimestampStorage.java b/timestamp-storage/src/test/java/org/apache/omid/timestamp/storage/TestHBaseTimestampStorage.java
index b6ad054..86180ac 100644
--- a/timestamp-storage/src/test/java/org/apache/omid/timestamp/storage/TestHBaseTimestampStorage.java
+++ b/timestamp-storage/src/test/java/org/apache/omid/timestamp/storage/TestHBaseTimestampStorage.java
@@ -55,7 +55,7 @@ public class TestHBaseTimestampStorage {
public static void setUpClass() throws Exception {
// HBase setup
hbaseConf = HBaseConfiguration.create();
-
+ hbaseConf.setBoolean("hbase.localcluster.assign.random.ports",true);
LOG.info("Create hbase");
testutil = new HBaseTestingUtility(hbaseConf);
hbasecluster = testutil.startMiniCluster(1);
@@ -73,7 +73,7 @@ public class TestHBaseTimestampStorage {
public void setUp() throws Exception {
HBaseAdmin admin = testutil.getHBaseAdmin();
- if (!admin.tableExists(DEFAULT_TIMESTAMP_STORAGE_TABLE_NAME)) {
+ if (!admin.tableExists(TableName.valueOf(DEFAULT_TIMESTAMP_STORAGE_TABLE_NAME))) {
HTableDescriptor desc = new HTableDescriptor(TABLE_NAME);
HColumnDescriptor datafam = new HColumnDescriptor(DEFAULT_TIMESTAMP_STORAGE_CF_NAME);
datafam.setMaxVersions(Integer.MAX_VALUE);
@@ -82,8 +82,8 @@ public class TestHBaseTimestampStorage {
admin.createTable(desc);
}
- if (admin.isTableDisabled(DEFAULT_TIMESTAMP_STORAGE_TABLE_NAME)) {
- admin.enableTable(DEFAULT_TIMESTAMP_STORAGE_TABLE_NAME);
+ if (admin.isTableDisabled(TableName.valueOf(DEFAULT_TIMESTAMP_STORAGE_TABLE_NAME))) {
+ admin.enableTable(TableName.valueOf(DEFAULT_TIMESTAMP_STORAGE_TABLE_NAME));
}
HTableDescriptor[] tables = admin.listTables();
for (HTableDescriptor t : tables) {
@@ -96,10 +96,10 @@ public class TestHBaseTimestampStorage {
try {
LOG.info("tearing Down");
HBaseAdmin admin = testutil.getHBaseAdmin();
- admin.disableTable(DEFAULT_TIMESTAMP_STORAGE_TABLE_NAME);
- admin.deleteTable(DEFAULT_TIMESTAMP_STORAGE_TABLE_NAME);
+ admin.disableTable(TableName.valueOf(DEFAULT_TIMESTAMP_STORAGE_TABLE_NAME));
+ admin.deleteTable(TableName.valueOf(DEFAULT_TIMESTAMP_STORAGE_TABLE_NAME));
- } catch (Exception e) {
+ } catch (IOException e) {
LOG.error("Error tearing down", e);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
----------------------------------------------------------------------
diff --git a/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java b/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
index 573f631..e1d623d 100644
--- a/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
+++ b/transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java
@@ -39,10 +39,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
-import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.CACHE;
-import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.COMMIT_TABLE;
-import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.NOT_PRESENT;
-import static org.apache.omid.committable.CommitTable.CommitTimestamp.Location.SHADOW_CELL;
+
import static org.apache.omid.metrics.MetricsUtils.name;
/**
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/tso-server/pom.xml
----------------------------------------------------------------------
diff --git a/tso-server/pom.xml b/tso-server/pom.xml
index e1488c1..88bc016 100644
--- a/tso-server/pom.xml
+++ b/tso-server/pom.xml
@@ -272,24 +272,23 @@
</build>
<profiles>
-
<profile>
- <id>hbase-0</id>
+ <id>hbase-1</id>
<dependencies>
<dependency>
<groupId>org.apache.omid</groupId>
- <artifactId>omid-hbase0-shims</artifactId>
+ <artifactId>omid-hbase1-shims</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
- <id>hbase-1</id>
+ <id>hbase-2</id>
<dependencies>
<dependency>
<groupId>org.apache.omid</groupId>
- <artifactId>omid-hbase1-shims</artifactId>
+ <artifactId>omid-hbase2-shims</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
[2/2] incubator-omid git commit: [OMID-110] - support hbase-2
Posted by oh...@apache.org.
[OMID-110] - support hbase-2
Signed-off-by: Ohad Shacham <oh...@yahoo-inc.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/846e5a58
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/846e5a58
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/846e5a58
Branch: refs/heads/phoenix-integration
Commit: 846e5a587ae69652af29cc7511564c22edfd6584
Parents: 75dc817
Author: Yonatan Gottesman <yo...@gmail.com>
Authored: Sun Sep 16 20:49:21 2018 +0300
Committer: Ohad Shacham <oh...@yahoo-inc.com>
Committed: Mon Sep 17 09:49:12 2018 +0300
----------------------------------------------------------------------
.travis.yml | 4 +-
doc/site/markdown/quickstart.md | 6 +-
examples/pom.xml | 9 +-
.../omid/examples/SnapshotIsolationExample.java | 6 +-
hbase-client/pom.xml | 8 +-
.../apache/omid/transaction/OmidTestBase.java | 10 +-
.../TestEndToEndScenariosWithHA.java | 2 +-
.../omid/transaction/TestShadowCells.java | 5 +-
hbase-commit-table/pom.xml | 8 +-
.../committable/hbase/TestHBaseCommitTable.java | 1 +
hbase-common/pom.xml | 8 +-
.../org/apache/omid/transaction/CellUtils.java | 4 +-
hbase-coprocessor/pom.xml | 13 +-
.../apache/omid/transaction/CellSkipFilter.java | 137 -----------
.../omid/transaction/CellSkipFilterBase.java | 136 ++++++++++
.../apache/omid/transaction/CompactorUtil.java | 47 ++--
.../apache/omid/transaction/OmidCompactor.java | 20 +-
.../omid/transaction/OmidSnapshotFilter.java | 44 ++--
.../omid/transaction/TransactionFilters.java | 2 +-
.../TransactionVisibilityFilter.java | 245 -------------------
.../TransactionVisibilityFilterBase.java | 239 ++++++++++++++++++
.../TSOForHBaseCompactorTestModule.java | 1 +
.../TSOForSnapshotFilterTestModule.java | 2 +-
.../apache/omid/transaction/TestCompaction.java | 25 +-
.../omid/transaction/TestSnapshotFilter.java | 29 ++-
hbase-shims/hbase-0/pom.xml | 19 --
.../hadoop/hbase/regionserver/Region.java | 54 ----
.../hbase/regionserver/ScannerContext.java | 28 ---
.../main/java/org/apache/omid/HBaseShims.java | 59 -----
.../main/java/org/apache/omid/HBaseShims.java | 31 +++
.../java/org/apache/omid/OmidFilterBase.java | 37 +++
hbase-shims/hbase-2/pom.xml | 28 +++
.../hbase/coprocessor/BaseRegionObserver.java | 47 ++++
.../main/java/org/apache/omid/HBaseShims.java | 92 +++++++
.../java/org/apache/omid/OmidBaseFilter.java | 21 ++
hbase-shims/pom.xml | 12 +-
hbase-tools/pom.xml | 12 +-
.../omid/tools/hbase/TestOmidTableManager.java | 2 +-
pom.xml | 16 +-
timestamp-storage/pom.xml | 8 +-
.../storage/TestHBaseTimestampStorage.java | 14 +-
.../transaction/AbstractTransactionManager.java | 5 +-
tso-server/pom.xml | 9 +-
43 files changed, 824 insertions(+), 681 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index b525efe..c16c831 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -36,9 +36,9 @@ script:
- if [ "${TRAVIS_PULL_REQUEST}" = "false" ] ; then
git checkout master
&&
- mvn clean cobertura:cobertura coveralls:report -Phbase-0 ;
+ mvn clean cobertura:cobertura coveralls:report -Phbase-1 ;
else
git checkout -b tmp-build-branch
&&
- mvn clean test -Phbase-0 ;
+ mvn clean test -Phbase-1 ;
fi
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/doc/site/markdown/quickstart.md
----------------------------------------------------------------------
diff --git a/doc/site/markdown/quickstart.md b/doc/site/markdown/quickstart.md
index 8a1d073..38647c3 100644
--- a/doc/site/markdown/quickstart.md
+++ b/doc/site/markdown/quickstart.md
@@ -18,11 +18,11 @@ Then start HBase in [standalone mode](https://hbase.apache.org/book.html#quickst
### 2. Clone the [Omid repository](https://github.com/apache/incubator-omid) and Build the TSO Package:
```sh
-$ git clone git@github.com:yahoo/omid.git
+$ git clone git@github.com:apache/incubator-omid.git
$ cd omid
-$ mvn clean install -Phbase-0 (for HBase 0.x versions)
-or
$ mvn clean install -Phbase-1 (for HBase 1.x versions)
+or
+$ mvn clean install -Phbase-2 (for HBase 2.x versions)
```
This will generate a binary package containing all dependencies for the TSO in tso-server/target/tso-server-\<VERSION\>-bin.tar.gz.
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/examples/pom.xml
----------------------------------------------------------------------
diff --git a/examples/pom.xml b/examples/pom.xml
index 951a9de..62e46af 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -118,24 +118,23 @@
</build>
<profiles>
-
<profile>
- <id>hbase-0</id>
+ <id>hbase-1</id>
<dependencies>
<dependency>
<groupId>org.apache.omid</groupId>
- <artifactId>omid-hbase0-shims</artifactId>
+ <artifactId>omid-hbase1-shims</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
- <id>hbase-1</id>
+ <id>hbase-2</id>
<dependencies>
<dependency>
<groupId>org.apache.omid</groupId>
- <artifactId>omid-hbase1-shims</artifactId>
+ <artifactId>omid-hbase2-shims</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/examples/src/main/java/org/apache/omid/examples/SnapshotIsolationExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/omid/examples/SnapshotIsolationExample.java b/examples/src/main/java/org/apache/omid/examples/SnapshotIsolationExample.java
index b68e19b..2b5ed84 100644
--- a/examples/src/main/java/org/apache/omid/examples/SnapshotIsolationExample.java
+++ b/examples/src/main/java/org/apache/omid/examples/SnapshotIsolationExample.java
@@ -128,7 +128,7 @@ public class SnapshotIsolationExample {
Transaction tx0 = tm.begin();
byte[] rowId = rowIdGenerator.getRowId();
Put initialPut = new Put(rowId);
- initialPut.add(family, qualifier, initialData);
+ initialPut.addColumn(family, qualifier, initialData);
txTable.put(tx0, initialPut);
tm.commit(tx0);
LOG.info("Initial Transaction {} COMMITTED. Base value written in {}:{}/{}/{} = {}",
@@ -139,7 +139,7 @@ public class SnapshotIsolationExample {
Transaction tx1 = tm.begin();
LOG.info("Transaction {} STARTED", tx1);
Put tx1Put = new Put(rowId);
- tx1Put.add(family, qualifier, dataValue1);
+ tx1Put.addColumn(family, qualifier, dataValue1);
txTable.put(tx1, tx1Put);
LOG.info("Transaction {} updates base value in {}:{}/{}/{} = {} in its own Snapshot",
tx1, userTableName, Bytes.toString(rowId), Bytes.toString(family),
@@ -178,7 +178,7 @@ public class SnapshotIsolationExample {
// Tx2 tries to write the column written by the committed concurrent transaction Tx1...
Put tx2Put = new Put(rowId);
- tx2Put.add(family, qualifier, dataValue2);
+ tx2Put.addColumn(family, qualifier, dataValue2);
txTable.put(tx2, tx2Put);
LOG.info(
"Concurrent Transaction {} updates {}:{}/{}/{} = {} in its own Snapshot (Will conflict with {} at commit time)",
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/hbase-client/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-client/pom.xml b/hbase-client/pom.xml
index d231d0c..f8bf203 100644
--- a/hbase-client/pom.xml
+++ b/hbase-client/pom.xml
@@ -160,22 +160,22 @@
<profiles>
<profile>
- <id>hbase-0</id>
+ <id>hbase-1</id>
<dependencies>
<dependency>
<groupId>org.apache.omid</groupId>
- <artifactId>omid-hbase0-shims</artifactId>
+ <artifactId>omid-hbase1-shims</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
- <id>hbase-1</id>
+ <id>hbase-2</id>
<dependencies>
<dependency>
<groupId>org.apache.omid</groupId>
- <artifactId>omid-hbase1-shims</artifactId>
+ <artifactId>omid-hbase2-shims</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/hbase-client/src/test/java/org/apache/omid/transaction/OmidTestBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/OmidTestBase.java b/hbase-client/src/test/java/org/apache/omid/transaction/OmidTestBase.java
index 79f02eb..3bbcc6e 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/OmidTestBase.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/OmidTestBase.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.omid.HBaseShims;
import org.apache.omid.TestUtils;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.committable.InMemoryCommitTable;
@@ -121,14 +122,13 @@ public abstract class OmidTestBase {
File tempFile = File.createTempFile("OmidTest", "");
tempFile.deleteOnExit();
hbaseConf.set("hbase.rootdir", tempFile.getAbsolutePath());
-
+ hbaseConf.setBoolean("hbase.localcluster.assign.random.ports",true);
hBaseUtils = new HBaseTestingUtility(hbaseConf);
hbaseCluster = hBaseUtils.startMiniCluster(1);
connection = ConnectionFactory.createConnection(hbaseConf);
- hBaseUtils.createTable(Bytes.toBytes(hBaseTimestampStorageConfig.getTableName()),
- new byte[][]{hBaseTimestampStorageConfig.getFamilyName().getBytes()},
- Integer.MAX_VALUE);
-
+ hBaseUtils.createTable(TableName.valueOf(hBaseTimestampStorageConfig.getTableName()),
+ new byte[][]{hBaseTimestampStorageConfig.getFamilyName().getBytes()},
+ Integer.MAX_VALUE);
createTestTable();
createCommitTable();
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/hbase-client/src/test/java/org/apache/omid/transaction/TestEndToEndScenariosWithHA.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestEndToEndScenariosWithHA.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestEndToEndScenariosWithHA.java
index 71c9508..95d7fcc 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestEndToEndScenariosWithHA.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestEndToEndScenariosWithHA.java
@@ -162,7 +162,7 @@ public class TestEndToEndScenariosWithHA extends OmidTestBase {
LOG.info("Cleanup");
HBaseAdmin admin = hBaseUtils.getHBaseAdmin();
deleteTable(admin, TableName.valueOf(DEFAULT_TIMESTAMP_STORAGE_TABLE_NAME));
- hBaseUtils.createTable(Bytes.toBytes(DEFAULT_TIMESTAMP_STORAGE_TABLE_NAME),
+ hBaseUtils.createTable(TableName.valueOf((DEFAULT_TIMESTAMP_STORAGE_TABLE_NAME)),
new byte[][]{DEFAULT_TIMESTAMP_STORAGE_CF_NAME.getBytes()},
Integer.MAX_VALUE);
tso1.stopAndWait();
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
index 42fd406..02e7ef5 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestShadowCells.java
@@ -41,6 +41,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -267,7 +268,7 @@ public class TestShadowCells extends OmidTestBase {
public ListenableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
table.flushCommits();
HBaseAdmin admin = hBaseUtils.getHBaseAdmin();
- admin.disableTable(table.getTableName());
+ admin.disableTable(TableName.valueOf(table.getTableName()));
return (ListenableFuture<Void>) invocation.callRealMethod();
}
}).when(syncPostCommitter).updateShadowCells(any(HBaseTransaction.class));
@@ -282,7 +283,7 @@ public class TestShadowCells extends OmidTestBase {
// Re-enable table to allow the required checks below
HBaseAdmin admin = hBaseUtils.getHBaseAdmin();
- admin.enableTable(table.getTableName());
+ admin.enableTable(TableName.valueOf(table.getTableName()));
// 1) check that shadow cell is not created...
assertTrue(hasCell(row, family, qualifier, tx.getStartTimestamp(), new TTableCellGetterAdapter(table)),
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/hbase-commit-table/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-commit-table/pom.xml b/hbase-commit-table/pom.xml
index 346b679..c432536 100644
--- a/hbase-commit-table/pom.xml
+++ b/hbase-commit-table/pom.xml
@@ -139,22 +139,22 @@
<profiles>
<profile>
- <id>hbase-0</id>
+ <id>hbase-1</id>
<dependencies>
<dependency>
<groupId>org.apache.omid</groupId>
- <artifactId>omid-hbase0-shims</artifactId>
+ <artifactId>omid-hbase1-shims</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
- <id>hbase-1</id>
+ <id>hbase-2</id>
<dependencies>
<dependency>
<groupId>org.apache.omid</groupId>
- <artifactId>omid-hbase1-shims</artifactId>
+ <artifactId>omid-hbase2-shims</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/hbase-commit-table/src/test/java/org/apache/omid/committable/hbase/TestHBaseCommitTable.java
----------------------------------------------------------------------
diff --git a/hbase-commit-table/src/test/java/org/apache/omid/committable/hbase/TestHBaseCommitTable.java b/hbase-commit-table/src/test/java/org/apache/omid/committable/hbase/TestHBaseCommitTable.java
index 5bb860d..d684d18 100644
--- a/hbase-commit-table/src/test/java/org/apache/omid/committable/hbase/TestHBaseCommitTable.java
+++ b/hbase-commit-table/src/test/java/org/apache/omid/committable/hbase/TestHBaseCommitTable.java
@@ -74,6 +74,7 @@ public class TestHBaseCommitTable {
public void setUpClass() throws Exception {
// HBase setup
hbaseConf = HBaseConfiguration.create();
+ hbaseConf.setBoolean("hbase.localcluster.assign.random.ports",true);
DefaultHBaseCommitTableStorageModule module = new DefaultHBaseCommitTableStorageModule();
commitTableFamily = module.getFamilyName().getBytes();
lowWatermarkFamily = module.getLowWatermarkFamily().getBytes();
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/hbase-common/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-common/pom.xml b/hbase-common/pom.xml
index d623dbc..91abde6 100644
--- a/hbase-common/pom.xml
+++ b/hbase-common/pom.xml
@@ -80,22 +80,22 @@
<profiles>
<profile>
- <id>hbase-0</id>
+ <id>hbase-1</id>
<dependencies>
<dependency>
<groupId>org.apache.omid</groupId>
- <artifactId>omid-hbase0-shims</artifactId>
+ <artifactId>omid-hbase1-shims</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
- <id>hbase-1</id>
+ <id>hbase-2</id>
<dependencies>
<dependency>
<groupId>org.apache.omid</groupId>
- <artifactId>omid-hbase1-shims</artifactId>
+ <artifactId>omid-hbase2-shims</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java b/hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java
index df22e70..61a3492 100644
--- a/hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java
+++ b/hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java
@@ -26,13 +26,13 @@ import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.omid.HBaseShims;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -304,7 +304,7 @@ public final class CellUtils {
// Move CellComparator to HBaseSims for 2.0 support
// Need to access through CellComparatorImpl.COMPARATOR
SortedMap<Cell, Optional<Cell>> cellToShadowCellMap
- = new TreeMap<Cell, Optional<Cell>>(new CellComparator());
+ = new TreeMap<Cell, Optional<Cell>>(HBaseShims.cellComparatorInstance());
Map<CellId, Cell> cellIdToCellMap = new HashMap<CellId, Cell>();
for (Cell cell : cells) {
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/hbase-coprocessor/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/pom.xml b/hbase-coprocessor/pom.xml
index e9eab4f..de5f565 100644
--- a/hbase-coprocessor/pom.xml
+++ b/hbase-coprocessor/pom.xml
@@ -52,7 +52,6 @@
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
</dependency>
-
<!-- end storage related -->
<!-- utils -->
@@ -97,7 +96,6 @@
<artifactId>hbase-testing-util</artifactId>
<scope>test</scope>
</dependency>
-
<!-- end testing -->
</dependencies>
@@ -144,22 +142,25 @@
<profiles>
<profile>
- <id>hbase-0</id>
+ <id>hbase-1</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
<dependencies>
<dependency>
<groupId>org.apache.omid</groupId>
- <artifactId>omid-hbase0-shims</artifactId>
+ <artifactId>omid-hbase1-shims</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
- <id>hbase-1</id>
+ <id>hbase-2</id>
<dependencies>
<dependency>
<groupId>org.apache.omid</groupId>
- <artifactId>omid-hbase1-shims</artifactId>
+ <artifactId>omid-hbase2-shims</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CellSkipFilter.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CellSkipFilter.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CellSkipFilter.java
deleted file mode 100644
index dbad494..0000000
--- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CellSkipFilter.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.omid.transaction;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.FilterBase;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * {@link Filter} that encapsulates another {@link Filter}. It remembers the last {@link KeyValue}
- * for which the underlying filter returned the {@link ReturnCode#NEXT_COL} or {@link ReturnCode#INCLUDE_AND_NEXT_COL},
- * so that when {@link #filterKeyValue} is called again for the same {@link KeyValue} with different
- * version, it returns {@link ReturnCode#NEXT_COL} directly without consulting the underlying {@link Filter}.
- * Please see TEPHRA-169 for more details.
- */
-
-public class CellSkipFilter extends FilterBase {
- private final Filter filter;
- // remember the previous keyvalue processed by filter when the return code was NEXT_COL or INCLUDE_AND_NEXT_COL
- private KeyValue skipColumn = null;
-
- public CellSkipFilter(Filter filter) {
- this.filter = filter;
- }
-
- /**
- * Determines whether the current cell should be skipped. The cell will be skipped
- * if the previous keyvalue had the same key as the current cell. This means filter already responded
- * for the previous keyvalue with ReturnCode.NEXT_COL or ReturnCode.INCLUDE_AND_NEXT_COL.
- * @param cell the {@link Cell} to be tested for skipping
- * @return true is current cell should be skipped, false otherwise
- */
- private boolean skipCellVersion(Cell cell) {
- return skipColumn != null
- && Bytes.equals(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
- skipColumn.getRowArray(), skipColumn.getRowOffset(), skipColumn.getRowLength())
- && Bytes.equals(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
- skipColumn.getFamilyArray(), skipColumn.getFamilyOffset(), skipColumn.getFamilyLength())
- && Bytes.equals(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
- skipColumn.getQualifierArray(), skipColumn.getQualifierOffset(), skipColumn.getQualifierLength());
- }
-
- @Override
- public ReturnCode filterKeyValue(Cell cell) throws IOException {
- if (skipCellVersion(cell)) {
- return ReturnCode.NEXT_COL;
- }
-
- ReturnCode code = filter.filterKeyValue(cell);
- if (code == ReturnCode.NEXT_COL || code == ReturnCode.INCLUDE_AND_NEXT_COL) {
- // only store the reference to the keyvalue if we are returning NEXT_COL or INCLUDE_AND_NEXT_COL
- skipColumn = new KeyValue(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength(),cell.getFamilyArray(),
- cell.getFamilyOffset(),cell.getFamilyLength(),cell.getQualifierArray(),
- cell.getQualifierOffset(), cell.getQualifierLength(),9223372036854775807L,
- KeyValue.Type.Maximum, (byte[])null, 0, 0);
- } else {
- skipColumn = null;
- }
- return code;
- }
-
- @Override
- public boolean filterRow() throws IOException {
- return filter.filterRow();
- }
-
- @Override
- public Cell transformCell(Cell cell) throws IOException {
- return filter.transformCell(cell);
- }
-
- @Override
- public void reset() throws IOException {
- filter.reset();
- }
-
- @Override
- public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
- return filter.filterRowKey(buffer, offset, length);
- }
-
- @Override
- public boolean filterAllRemaining() throws IOException {
- return filter.filterAllRemaining();
- }
-
- @Override
- public void filterRowCells(List<Cell> kvs) throws IOException {
- filter.filterRowCells(kvs);
- }
-
- @Override
- public boolean hasFilterRow() {
- return filter.hasFilterRow();
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public KeyValue getNextKeyHint(KeyValue currentKV) throws IOException {
- return filter.getNextKeyHint(currentKV);
- }
-
- @Override
- public Cell getNextCellHint(Cell currentKV) throws IOException {
- return filter.getNextCellHint(currentKV);
- }
-
- @Override
- public boolean isFamilyEssential(byte[] name) throws IOException {
- return filter.isFamilyEssential(name);
- }
-
- @Override
- public byte[] toByteArray() throws IOException {
- return filter.toByteArray();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CellSkipFilterBase.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CellSkipFilterBase.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CellSkipFilterBase.java
new file mode 100644
index 0000000..4262a23
--- /dev/null
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CellSkipFilterBase.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.omid.OmidFilterBase;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * {@link Filter} that encapsulates another {@link Filter}. It remembers the last {@link KeyValue}
+ * for which the underlying filter returned the {@link ReturnCode#NEXT_COL} or {@link ReturnCode#INCLUDE_AND_NEXT_COL},
+ * so that when {@link #filterKeyValue} is called again for the same {@link KeyValue} with different
+ * version, it returns {@link ReturnCode#NEXT_COL} directly without consulting the underlying {@link Filter}.
+ * Please see TEPHRA-169 for more details.
+ */
+
+public class CellSkipFilterBase extends OmidFilterBase {
+ private final Filter filter;
+ // remember the previous keyvalue processed by filter when the return code was NEXT_COL or INCLUDE_AND_NEXT_COL
+ private KeyValue skipColumn = null;
+
+ public CellSkipFilterBase(Filter filter) {
+ this.filter = filter;
+ }
+
+ /**
+ * Determines whether the current cell should be skipped. The cell will be skipped
+ * if the previous keyvalue had the same key as the current cell. This means filter already responded
+ * for the previous keyvalue with ReturnCode.NEXT_COL or ReturnCode.INCLUDE_AND_NEXT_COL.
+ * @param cell the {@link Cell} to be tested for skipping
+ * @return true is current cell should be skipped, false otherwise
+ */
+ private boolean skipCellVersion(Cell cell) {
+ return skipColumn != null
+ && CellUtil.matchingRow(cell, skipColumn.getRowArray(), skipColumn.getRowOffset(),
+ skipColumn.getRowLength())
+ && CellUtil.matchingFamily(cell, skipColumn.getFamilyArray(), skipColumn.getFamilyOffset(),
+ skipColumn.getFamilyLength())
+ && CellUtil.matchingQualifier(cell, skipColumn.getQualifierArray(), skipColumn.getQualifierOffset(),
+ skipColumn.getQualifierLength());
+ }
+
+ @Override
+ public ReturnCode filterKeyValue(Cell cell) throws IOException {
+ if (skipCellVersion(cell)) {
+ return ReturnCode.NEXT_COL;
+ }
+
+ ReturnCode code = filter.filterKeyValue(cell);
+ if (code == ReturnCode.NEXT_COL || code == ReturnCode.INCLUDE_AND_NEXT_COL) {
+ // only store the reference to the keyvalue if we are returning NEXT_COL or INCLUDE_AND_NEXT_COL
+ skipColumn = KeyValueUtil.createFirstOnRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
+ cell.getFamilyArray(), cell.getFamilyOffset(),
+ cell.getFamilyLength(), cell.getQualifierArray(),
+ cell.getQualifierOffset(), cell.getQualifierLength());
+ } else {
+ skipColumn = null;
+ }
+ return code;
+ }
+
+ @Override
+ public boolean filterRow() throws IOException {
+ return filter.filterRow();
+ }
+
+ @Override
+ public Cell transformCell(Cell cell) throws IOException {
+ return filter.transformCell(cell);
+ }
+
+ @Override
+ public void reset() throws IOException {
+ filter.reset();
+ }
+
+ @Override
+ public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
+ return filter.filterRowKey(buffer, offset, length);
+ }
+
+ @Override
+ public boolean filterAllRemaining() throws IOException {
+ return filter.filterAllRemaining();
+ }
+
+ @Override
+ public void filterRowCells(List<Cell> kvs) throws IOException {
+ filter.filterRowCells(kvs);
+ }
+
+ @Override
+ public boolean hasFilterRow() {
+ return filter.hasFilterRow();
+ }
+
+ @Override
+ public Cell getNextCellHint(Cell currentKV) throws IOException {
+ return filter.getNextCellHint(currentKV);
+ }
+
+ @Override
+ public boolean isFamilyEssential(byte[] name) throws IOException {
+ return filter.isFamilyEssential(name);
+ }
+
+ @Override
+ public byte[] toByteArray() throws IOException {
+ return filter.toByteArray();
+ }
+
+ public Filter getInnerFilter() {
+ return filter;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorUtil.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorUtil.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorUtil.java
index e7640a7..f95191c 100644
--- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorUtil.java
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/CompactorUtil.java
@@ -20,46 +20,37 @@ package org.apache.omid.transaction;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParametersDelegate;
+
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+
+import org.apache.omid.HBaseShims;
import org.apache.omid.tools.hbase.HBaseLogin;
import org.apache.omid.tools.hbase.SecureHBaseConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
+
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
public class CompactorUtil {
- public static void enableOmidCompaction(Configuration conf,
+ public static void setOmidCompaction(Connection conn, TableName table, byte[] columnFamily, String value)
+ throws IOException {
+ HBaseShims.setCompaction(conn, table, columnFamily, OmidCompactor.OMID_COMPACTABLE_CF_FLAG, value);
+ }
+
+ public static void enableOmidCompaction(Connection conn,
TableName table, byte[] columnFamily) throws IOException {
- HBaseAdmin admin = new HBaseAdmin(conf);
- try {
- HTableDescriptor desc = admin.getTableDescriptor(table);
- HColumnDescriptor cfDesc = desc.getFamily(columnFamily);
- cfDesc.setValue(OmidCompactor.OMID_COMPACTABLE_CF_FLAG,
- Boolean.TRUE.toString());
- admin.modifyColumn(table, cfDesc);
- } finally {
- admin.close();
- }
+
+ setOmidCompaction(conn, table, columnFamily, Boolean.TRUE.toString());
}
- public static void disableOmidCompaction(Configuration conf,
+ public static void disableOmidCompaction(Connection conn,
TableName table, byte[] columnFamily) throws IOException {
- HBaseAdmin admin = new HBaseAdmin(conf);
- try {
- HTableDescriptor desc = admin.getTableDescriptor(table);
- HColumnDescriptor cfDesc = desc.getFamily(columnFamily);
- cfDesc.setValue(OmidCompactor.OMID_COMPACTABLE_CF_FLAG,
- Boolean.FALSE.toString());
- admin.modifyColumn(table, cfDesc);
- } finally {
- admin.close();
- }
+ setOmidCompaction(conn, table, columnFamily, Boolean.FALSE.toString());
}
static class Config {
@@ -94,11 +85,13 @@ public class CompactorUtil {
HBaseLogin.loginIfNeeded(cmdline.loginFlags);
Configuration conf = HBaseConfiguration.create();
+ Connection conn = ConnectionFactory.createConnection(conf);
+
if (cmdline.enable) {
- enableOmidCompaction(conf, TableName.valueOf(cmdline.table),
+ enableOmidCompaction(conn, TableName.valueOf(cmdline.table),
Bytes.toBytes(cmdline.columnFamily));
} else if (cmdline.disable) {
- disableOmidCompaction(conf, TableName.valueOf(cmdline.table),
+ disableOmidCompaction(conn, TableName.valueOf(cmdline.table),
Bytes.toBytes(cmdline.columnFamily));
} else {
System.err.println("Must specify enable or disable");
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java
index e839187..97dbea3 100644
--- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidCompactor.java
@@ -19,14 +19,14 @@ package org.apache.omid.transaction;
import com.google.common.annotations.VisibleForTesting;
+
import org.apache.omid.committable.CommitTable;
import org.apache.omid.committable.hbase.HBaseCommitTable;
import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.omid.HBaseShims;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -35,11 +35,12 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
-import org.apache.hadoop.hbase.util.Bytes;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -78,6 +79,10 @@ public class OmidCompactor extends BaseRegionObserver {
this(false);
}
+ public Optional getRegionObserver() {
+ return Optional.of(this);
+ }
+
public OmidCompactor(boolean enableCompactorForAllFamilies) {
LOG.info("Compactor coprocessor initialized");
this.enableCompactorForAllFamilies = enableCompactorForAllFamilies;
@@ -109,7 +114,8 @@ public class OmidCompactor extends BaseRegionObserver {
LOG.info("Compactor coprocessor stopped");
}
- @Override
+
+
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> env,
Store store,
InternalScanner scanner,
@@ -120,10 +126,8 @@ public class OmidCompactor extends BaseRegionObserver {
if (enableCompactorForAllFamilies) {
omidCompactable = true;
} else {
- HTableDescriptor desc = env.getEnvironment().getRegion().getTableDesc();
- HColumnDescriptor famDesc
- = desc.getFamily(Bytes.toBytes(store.getColumnFamilyName()));
- omidCompactable = Boolean.valueOf(famDesc.getValue(OMID_COMPACTABLE_CF_FLAG));
+
+ omidCompactable = HBaseShims.OmidCompactionEnabled(env, store, OMID_COMPACTABLE_CF_FLAG);
}
// only column families tagged as compactable are compacted
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
index cd68968..3c5f3c6 100644
--- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java
@@ -20,10 +20,14 @@ package org.apache.omid.transaction;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hbase.client.Scan;
+
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
+
+
import org.apache.omid.committable.CommitTable;
import org.apache.omid.committable.hbase.HBaseCommitTable;
import org.apache.omid.committable.hbase.HBaseCommitTableConfig;
@@ -34,7 +38,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.RegionAccessWrapper;
@@ -46,6 +49,7 @@ import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -74,8 +78,13 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
LOG.info("Compactor coprocessor initialized via empty constructor");
}
+
+ public Optional getRegionObserver() {
+ return Optional.of(this);
+ }
+
@Override
- public void start(CoprocessorEnvironment env) throws IOException {
+ public void start(CoprocessorEnvironment env) {
LOG.info("Starting snapshot filter coprocessor");
conf = env.getConfiguration();
commitTableConf = new HBaseCommitTableConfig();
@@ -99,8 +108,7 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
@Override
- public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results)
- throws IOException {
+ public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results) {
SnapshotFilterImpl snapshotFilter = snapshotFilterMap.get(get);
if (snapshotFilter != null) {
snapshotFilterQueue.add(snapshotFilter);
@@ -135,14 +143,21 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
return snapshotFilter;
}
- @Override
+
public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e,
- Scan scan,
- RegionScanner s) throws IOException {
+ Scan scan,
+ RegionScanner s) throws IOException {
+ preScannerOpen(e,scan);
+ return s;
+ }
+
+
+ public void preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e,
+ Scan scan) throws IOException {
byte[] byteTransaction = scan.getAttribute(CellUtils.TRANSACTION_ATTRIBUTE);
if (byteTransaction == null) {
- return s;
+ return;
}
HBaseTransaction hbaseTransaction = getHBaseTransaction(byteTransaction);
@@ -153,13 +168,14 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
snapshotFilter, hbaseTransaction);
scan.setFilter(newFilter);
snapshotFilterMap.put(scan, snapshotFilter);
- return s;
+ return;
}
- @Override
+
+
public RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e,
Scan scan,
- RegionScanner s) throws IOException {
+ RegionScanner s) {
byte[] byteTransaction = scan.getAttribute(CellUtils.TRANSACTION_ATTRIBUTE);
if (byteTransaction == null) {
@@ -173,9 +189,9 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
return s;
}
+
@Override
- public void preScannerClose(ObserverContext<RegionCoprocessorEnvironment> e, InternalScanner s)
- throws IOException {
+ public void preScannerClose(ObserverContext<RegionCoprocessorEnvironment> e, InternalScanner s) {
SnapshotFilterImpl snapshotFilter = snapshotFilterMap.get(s);
if (snapshotFilter != null) {
snapshotFilterQueue.add(snapshotFilter);
@@ -192,7 +208,7 @@ public class OmidSnapshotFilter extends BaseRegionObserver {
long epoch = transaction.getEpoch();
VisibilityLevel visibilityLevel = VisibilityLevel.fromInteger(transaction.getVisibilityLevel());
- return new HBaseTransaction(id, readTs, visibilityLevel, epoch, new HashSet<HBaseCellId>(), new HashSet<HBaseCellId>(), null);
+ return new HBaseTransaction(id, readTs, visibilityLevel, epoch, new HashSet<>(), new HashSet<>(), null);
}
private CommitTable.Client initAndGetCommitTableClient() throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionFilters.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionFilters.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionFilters.java
index 588bcad..a1acc34 100644
--- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionFilters.java
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionFilters.java
@@ -26,6 +26,6 @@ public class TransactionFilters {
public static Filter getVisibilityFilter(Filter cellFilter,
SnapshotFilterImpl regionAccessWrapper,
HBaseTransaction hbaseTransaction) {
- return new CellSkipFilter(new TransactionVisibilityFilter(cellFilter, regionAccessWrapper, hbaseTransaction));
+ return new CellSkipFilterBase(new TransactionVisibilityFilterBase(cellFilter, regionAccessWrapper, hbaseTransaction));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionVisibilityFilter.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionVisibilityFilter.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionVisibilityFilter.java
deleted file mode 100644
index dafdf3f..0000000
--- a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionVisibilityFilter.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.omid.transaction;
-
-import com.google.common.base.Optional;
-
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.FilterBase;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-
-public class TransactionVisibilityFilter extends FilterBase {
-
- // optional sub-filter to apply to visible cells
- private final Filter userFilter;
- private final SnapshotFilterImpl snapshotFilter;
- private final Map<Long ,Long> commitCache;
- private final HBaseTransaction hbaseTransaction;
-
- // This cache is cleared when moving to the next row
- // So no need to keep row name
- private final Map<ImmutableBytesWritable, Long> familyDeletionCache;
-
- public TransactionVisibilityFilter(Filter cellFilter,
- SnapshotFilterImpl snapshotFilter,
- HBaseTransaction hbaseTransaction) {
- this.userFilter = cellFilter;
- this.snapshotFilter = snapshotFilter;
- commitCache = new HashMap<>();
- this.hbaseTransaction = hbaseTransaction;
- familyDeletionCache = new HashMap<>();
- }
-
- @Override
- public ReturnCode filterKeyValue(Cell v) throws IOException {
- if (CellUtils.isShadowCell(v)) {
- Long commitTs = Bytes.toLong(CellUtil.cloneValue(v));
- commitCache.put(v.getTimestamp(), commitTs);
- // Continue getting shadow cells until one of them fits this transaction
- if (hbaseTransaction.getStartTimestamp() >= commitTs) {
- return ReturnCode.NEXT_COL;
- } else {
- return ReturnCode.SKIP;
- }
- }
-
- Optional<Long> ct = getCommitIfInSnapshot(v, CellUtils.isFamilyDeleteCell(v));
- if (ct.isPresent()) {
- commitCache.put(v.getTimestamp(), ct.get());
- if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL &&
- snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent()) {
- return runUserFilter(v, ReturnCode.INCLUDE);
- }
- if (CellUtils.isFamilyDeleteCell(v)) {
- familyDeletionCache.put(createImmutableBytesWritable(v), ct.get());
- if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL) {
- return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL);
- } else {
- return ReturnCode.NEXT_COL;
- }
- }
- Long deleteCommit = familyDeletionCache.get(createImmutableBytesWritable(v));
- if (deleteCommit != null && deleteCommit >= v.getTimestamp()) {
- if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL) {
- return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL);
- } else {
- return ReturnCode.NEXT_COL;
- }
- }
- if (CellUtils.isTombstone(v)) {
- if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL) {
- return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL);
- } else {
- return ReturnCode.NEXT_COL;
- }
- }
-
- return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL);
- }
-
- return ReturnCode.SKIP;
- }
-
-
- private ImmutableBytesWritable createImmutableBytesWritable(Cell v) {
- return new ImmutableBytesWritable(v.getFamilyArray(),
- v.getFamilyOffset(),v.getFamilyLength());
- }
-
- private ReturnCode runUserFilter(Cell v, ReturnCode snapshotReturn)
- throws IOException {
- assert(snapshotReturn == ReturnCode.INCLUDE_AND_NEXT_COL || snapshotReturn == ReturnCode.INCLUDE);
- if (userFilter == null) {
- return snapshotReturn;
- }
-
- ReturnCode userRes = userFilter.filterKeyValue(v);
- switch (userRes) {
- case INCLUDE:
- return snapshotReturn;
- case SKIP:
- return (snapshotReturn == ReturnCode.INCLUDE) ? ReturnCode.SKIP: ReturnCode.NEXT_COL;
- default:
- return userRes;
- }
-
- }
-
- // For family delete cells, the sc hasn't arrived yet so get sc from region before going to ct
- private Optional<Long> getCommitIfInSnapshot(Cell v, boolean getShadowCellBeforeCT) throws IOException {
- Long cachedCommitTS = commitCache.get(v.getTimestamp());
- if (cachedCommitTS != null && hbaseTransaction.getStartTimestamp() >= cachedCommitTS) {
- return Optional.of(cachedCommitTS);
- }
- if (snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent()) {
- return Optional.of(v.getTimestamp());
- }
-
- if (getShadowCellBeforeCT) {
-
- // Try to get shadow cell from region
- final Get get = new Get(CellUtil.cloneRow(v));
- get.setTimeStamp(v.getTimestamp()).setMaxVersions(1);
- get.addColumn(CellUtil.cloneFamily(v), CellUtils.addShadowCellSuffixPrefix(CellUtils.FAMILY_DELETE_QUALIFIER));
- Result shadowCell = snapshotFilter.getTableAccessWrapper().get(get);
-
- if (!shadowCell.isEmpty()) {
- long commitTS = Bytes.toLong(CellUtil.cloneValue(shadowCell.rawCells()[0]));
- if (commitTS <= hbaseTransaction.getStartTimestamp()) {
- return Optional.of(commitTS);
- }
- }
- }
-
- return snapshotFilter.getTSIfInSnapshot(v, hbaseTransaction, commitCache);
- }
-
-
- @Override
- public void reset() throws IOException {
- commitCache.clear();
- familyDeletionCache.clear();
- if (userFilter != null) {
- userFilter.reset();
- }
- }
-
- @Override
- public boolean filterRow() throws IOException {
- if (userFilter != null) {
- return userFilter.filterRow();
- }
- return super.filterRow();
- }
-
-
- @Override
- public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
- if (userFilter != null) {
- return userFilter.filterRowKey(buffer, offset, length);
- }
- return super.filterRowKey(buffer, offset, length);
- }
-
- @Override
- public boolean filterAllRemaining() throws IOException {
- if (userFilter != null) {
- return userFilter.filterAllRemaining();
- }
- return super.filterAllRemaining();
- }
-
- @Override
- public void filterRowCells(List<Cell> kvs) throws IOException {
- if (userFilter != null) {
- userFilter.filterRowCells(kvs);
- } else {
- super.filterRowCells(kvs);
- }
- }
-
- @Override
- public boolean hasFilterRow() {
- if (userFilter != null) {
- return userFilter.hasFilterRow();
- }
- return super.hasFilterRow();
- }
-
-
- @Override
- public KeyValue getNextKeyHint(KeyValue currentKV) throws IOException {
- if (userFilter != null) {
- return userFilter.getNextKeyHint(currentKV);
- }
- return super.getNextKeyHint(currentKV);
- }
-
- @Override
- public Cell getNextCellHint(Cell currentKV) throws IOException {
- if (userFilter != null) {
- return userFilter.getNextCellHint(currentKV);
- }
- return super.getNextCellHint(currentKV);
- }
-
- @Override
- public boolean isFamilyEssential(byte[] name) throws IOException {
- if (userFilter != null) {
- return userFilter.isFamilyEssential(name);
- }
- return super.isFamilyEssential(name);
- }
-
- @Override
- public byte[] toByteArray() throws IOException {
- return super.toByteArray();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionVisibilityFilterBase.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionVisibilityFilterBase.java b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionVisibilityFilterBase.java
new file mode 100644
index 0000000..287d2a2
--- /dev/null
+++ b/hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionVisibilityFilterBase.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+import com.google.common.base.Optional;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.omid.OmidFilterBase;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class TransactionVisibilityFilterBase extends OmidFilterBase {
+
+ // optional sub-filter to apply to visible cells
+ private final Filter userFilter;
+ private final SnapshotFilterImpl snapshotFilter;
+ private final Map<Long ,Long> commitCache;
+ private final HBaseTransaction hbaseTransaction;
+
+ // This cache is cleared when moving to the next row
+ // So no need to keep row name
+ private final Map<ImmutableBytesWritable, Long> familyDeletionCache;
+
+ public TransactionVisibilityFilterBase(Filter cellFilter,
+ SnapshotFilterImpl snapshotFilter,
+ HBaseTransaction hbaseTransaction) {
+ this.userFilter = cellFilter;
+ this.snapshotFilter = snapshotFilter;
+ commitCache = new HashMap<>();
+ this.hbaseTransaction = hbaseTransaction;
+ familyDeletionCache = new HashMap<>();
+ }
+
+ @Override
+ public ReturnCode filterKeyValue(Cell v) throws IOException {
+ if (CellUtils.isShadowCell(v)) {
+ Long commitTs = Bytes.toLong(CellUtil.cloneValue(v));
+ commitCache.put(v.getTimestamp(), commitTs);
+ // Continue getting shadow cells until one of them fits this transaction
+ if (hbaseTransaction.getStartTimestamp() >= commitTs) {
+ return ReturnCode.NEXT_COL;
+ } else {
+ return ReturnCode.SKIP;
+ }
+ }
+
+ Optional<Long> ct = getCommitIfInSnapshot(v, CellUtils.isFamilyDeleteCell(v));
+ if (ct.isPresent()) {
+ commitCache.put(v.getTimestamp(), ct.get());
+ if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL &&
+ snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent()) {
+ return runUserFilter(v, ReturnCode.INCLUDE);
+ }
+ if (CellUtils.isFamilyDeleteCell(v)) {
+ familyDeletionCache.put(createImmutableBytesWritable(v), ct.get());
+ if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL) {
+ return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL);
+ } else {
+ return ReturnCode.NEXT_COL;
+ }
+ }
+ Long deleteCommit = familyDeletionCache.get(createImmutableBytesWritable(v));
+ if (deleteCommit != null && deleteCommit >= v.getTimestamp()) {
+ if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL) {
+ return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL);
+ } else {
+ return ReturnCode.NEXT_COL;
+ }
+ }
+ if (CellUtils.isTombstone(v)) {
+ if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL) {
+ return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL);
+ } else {
+ return ReturnCode.NEXT_COL;
+ }
+ }
+
+ return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL);
+ }
+
+ return ReturnCode.SKIP;
+ }
+
+
+ private ImmutableBytesWritable createImmutableBytesWritable(Cell v) {
+ return new ImmutableBytesWritable(v.getFamilyArray(),
+ v.getFamilyOffset(),v.getFamilyLength());
+ }
+
+ private ReturnCode runUserFilter(Cell v, ReturnCode snapshotReturn)
+ throws IOException {
+ assert(snapshotReturn == ReturnCode.INCLUDE_AND_NEXT_COL || snapshotReturn == ReturnCode.INCLUDE);
+ if (userFilter == null) {
+ return snapshotReturn;
+ }
+
+ ReturnCode userRes = userFilter.filterKeyValue(v);
+ switch (userRes) {
+ case INCLUDE:
+ return snapshotReturn;
+ case SKIP:
+ return (snapshotReturn == ReturnCode.INCLUDE) ? ReturnCode.SKIP: ReturnCode.NEXT_COL;
+ default:
+ return userRes;
+ }
+
+ }
+
+ // For family delete cells, the sc hasn't arrived yet so get sc from region before going to ct
+ private Optional<Long> getCommitIfInSnapshot(Cell v, boolean getShadowCellBeforeCT) throws IOException {
+ Long cachedCommitTS = commitCache.get(v.getTimestamp());
+ if (cachedCommitTS != null && hbaseTransaction.getStartTimestamp() >= cachedCommitTS) {
+ return Optional.of(cachedCommitTS);
+ }
+ if (snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent()) {
+ return Optional.of(v.getTimestamp());
+ }
+
+ if (getShadowCellBeforeCT) {
+
+ // Try to get shadow cell from region
+ final Get get = new Get(CellUtil.cloneRow(v));
+ get.setTimeStamp(v.getTimestamp()).setMaxVersions(1);
+ get.addColumn(CellUtil.cloneFamily(v), CellUtils.addShadowCellSuffixPrefix(CellUtils.FAMILY_DELETE_QUALIFIER));
+ Result shadowCell = snapshotFilter.getTableAccessWrapper().get(get);
+
+ if (!shadowCell.isEmpty()) {
+ long commitTS = Bytes.toLong(CellUtil.cloneValue(shadowCell.rawCells()[0]));
+ if (commitTS <= hbaseTransaction.getStartTimestamp()) {
+ return Optional.of(commitTS);
+ }
+ }
+ }
+
+ return snapshotFilter.getTSIfInSnapshot(v, hbaseTransaction, commitCache);
+ }
+
+
+ @Override
+ public void reset() throws IOException {
+ commitCache.clear();
+ familyDeletionCache.clear();
+ if (userFilter != null) {
+ userFilter.reset();
+ }
+ }
+
+ @Override
+ public boolean filterRow() throws IOException {
+ if (userFilter != null) {
+ return userFilter.filterRow();
+ }
+ return super.filterRow();
+ }
+
+
+ @Override
+ public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
+ if (userFilter != null) {
+ return userFilter.filterRowKey(buffer, offset, length);
+ }
+ return super.filterRowKey(buffer, offset, length);
+ }
+
+ @Override
+ public boolean filterAllRemaining() throws IOException {
+ if (userFilter != null) {
+ return userFilter.filterAllRemaining();
+ }
+ return super.filterAllRemaining();
+ }
+
+ @Override
+ public void filterRowCells(List<Cell> kvs) throws IOException {
+ if (userFilter != null) {
+ userFilter.filterRowCells(kvs);
+ } else {
+ super.filterRowCells(kvs);
+ }
+ }
+
+ @Override
+ public boolean hasFilterRow() {
+ if (userFilter != null) {
+ return userFilter.hasFilterRow();
+ }
+ return super.hasFilterRow();
+ }
+
+ @Override
+ public Cell getNextCellHint(Cell currentKV) throws IOException {
+ if (userFilter != null) {
+ return userFilter.getNextCellHint(currentKV);
+ }
+ return super.getNextCellHint(currentKV);
+ }
+
+ @Override
+ public boolean isFamilyEssential(byte[] name) throws IOException {
+ if (userFilter != null) {
+ return userFilter.isFamilyEssential(name);
+ }
+ return super.isFamilyEssential(name);
+ }
+
+ @Override
+ public byte[] toByteArray() throws IOException {
+ return super.toByteArray();
+ }
+
+ public Filter getInnerFilter() {
+ return userFilter;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java
index abfe67c..59c01db 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java
@@ -87,6 +87,7 @@ class TSOForHBaseCompactorTestModule extends AbstractModule {
@Singleton
Configuration provideHBaseConfig() throws IOException {
Configuration hbaseConf = HBaseConfiguration.create();
+ hbaseConf.setBoolean("hbase.localcluster.assign.random.ports",true);
hbaseConf.setInt("hbase.hregion.memstore.flush.size", 10_000 * 1024);
hbaseConf.setInt("hbase.regionserver.nbreservationblocks", 1);
hbaseConf.set("tso.host", "localhost");
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForSnapshotFilterTestModule.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForSnapshotFilterTestModule.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForSnapshotFilterTestModule.java
index 130a061..446b9d0 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForSnapshotFilterTestModule.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForSnapshotFilterTestModule.java
@@ -87,11 +87,11 @@ class TSOForSnapshotFilterTestModule extends AbstractModule {
@Singleton
Configuration provideHBaseConfig() throws IOException {
Configuration hbaseConf = HBaseConfiguration.create();
+ hbaseConf.setBoolean("hbase.localcluster.assign.random.ports",true);
hbaseConf.setInt("hbase.hregion.memstore.flush.size", 10_000 * 1024);
hbaseConf.setInt("hbase.regionserver.nbreservationblocks", 1);
hbaseConf.set("tso.host", "localhost");
hbaseConf.setInt("tso.port", 1234);
- hbaseConf.set("hbase.coprocessor.region.classes", "org.apache.omid.transaction.OmidSnapshotFilter");
final String rootdir = "/tmp/hbase.test.dir/";
File rootdirFile = new File(rootdir);
FileUtils.deleteDirectory(rootdirFile);
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompaction.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompaction.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompaction.java
index 2aae1b5..4d20458 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompaction.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestCompaction.java
@@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
@@ -56,8 +57,6 @@ import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
-import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.omid.HBaseShims;
import org.apache.omid.TestUtils;
@@ -105,10 +104,10 @@ public class TestCompaction {
private TSOServer tso;
- private AggregationClient aggregationClient;
+
private CommitTable commitTable;
private PostCommitActions syncPostCommitter;
- private Connection connection;
+ private static Connection connection;
@BeforeClass
public void setupTestCompation() throws Exception {
@@ -123,10 +122,8 @@ public class TestCompaction {
// settings required for #testDuplicateDeletes()
hbaseConf.setInt("hbase.hstore.compaction.min", 2);
hbaseConf.setInt("hbase.hstore.compaction.max", 2);
-
setupHBase();
connection = ConnectionFactory.createConnection(hbaseConf);
- aggregationClient = new AggregationClient(hbaseConf);
admin = connection.getAdmin();
createRequiredHBaseTables(hBaseTimestampStorageConfig, hBaseCommitTableConfig);
setupTSO();
@@ -163,10 +160,10 @@ public class TestCompaction {
desc.addFamily(datafam);
}
- desc.addCoprocessor("org.apache.hadoop.hbase.coprocessor.AggregateImplementation");
+ desc.addCoprocessor("org.apache.hadoop.hbase.coprocessor.AggregateImplementation",null,Coprocessor.PRIORITY_HIGHEST,null);
admin.createTable(desc);
for (byte[] family : families) {
- CompactorUtil.enableOmidCompaction(hbaseConf, TableName.valueOf(tableName), family);
+ CompactorUtil.enableOmidCompaction(connection, TableName.valueOf(tableName), family);
}
}
@@ -1189,10 +1186,16 @@ public class TestCompaction {
LOG.info("Waking up after 3 secs");
}
- private long rowCount(String tableName, byte[] family) throws Throwable {
+ private static long rowCount(String tableName, byte[] family) throws Throwable {
Scan scan = new Scan();
scan.addFamily(family);
- return aggregationClient.rowCount(TableName.valueOf(tableName), new LongColumnInterpreter(), scan);
+ Table table = connection.getTable(TableName.valueOf(tableName));
+ try (ResultScanner scanner = table.getScanner(scan)) {
+ int count = 0;
+ while (scanner.next() != null) {
+ count++;
+ }
+ return count;
+ }
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
index 8934b08..d698201 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java
@@ -29,6 +29,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
@@ -103,7 +104,18 @@ public class TestSnapshotFilter {
injector = Guice.createInjector(new TSOForSnapshotFilterTestModule(tsoConfig));
hbaseConf = injector.getInstance(Configuration.class);
hbaseConf.setBoolean("omid.server.side.filter", true);
- hbaseConf.setInt("hbase.master.info.port", 16011);
+ hbaseConf.setInt("hbase.hconnection.threads.core", 5);
+ hbaseConf.setInt("hbase.hconnection.threads.max", 10);
+ // Tunn down handler threads in regionserver
+ hbaseConf.setInt("hbase.regionserver.handler.count", 10);
+
+ // Set to random port
+ hbaseConf.setInt("hbase.master.port", 0);
+ hbaseConf.setInt("hbase.master.info.port", 0);
+ hbaseConf.setInt("hbase.regionserver.port", 0);
+ hbaseConf.setInt("hbase.regionserver.info.port", 0);
+
+
HBaseCommitTableConfig hBaseCommitTableConfig = injector.getInstance(HBaseCommitTableConfig.class);
HBaseTimestampStorageConfig hBaseTimestampStorageConfig = injector.getInstance(HBaseTimestampStorageConfig.class);
@@ -145,8 +157,17 @@ public class TestSnapshotFilter {
desc.addFamily(datafam);
}
- desc.addCoprocessor("org.apache.hadoop.hbase.coprocessor.AggregateImplementation");
+ int priority = Coprocessor.PRIORITY_HIGHEST;
+
+ desc.addCoprocessor(OmidSnapshotFilter.class.getName(),null,++priority,null);
+ desc.addCoprocessor("org.apache.hadoop.hbase.coprocessor.AggregateImplementation",null,++priority,null);
+
admin.createTable(desc);
+ try {
+ hbaseTestUtil.waitTableAvailable(TableName.valueOf(tableName),5000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
}
}
@@ -342,6 +363,7 @@ public class TestSnapshotFilter {
String TEST_TABLE = "testGetWithFamilyDelete";
createTableIfNotExists(TEST_TABLE, Bytes.toBytes(TEST_FAMILY), famName2);
+
TTable tt = new TTable(connection, TEST_TABLE);
Transaction tx1 = tm.begin();
@@ -361,7 +383,7 @@ public class TestSnapshotFilter {
Transaction tx3 = tm.begin();
Delete d = new Delete(rowName1);
- d.deleteFamily(famName2);
+ d.addFamily(famName2);
tt.delete(tx3, d);
@@ -854,4 +876,5 @@ public class TestSnapshotFilter {
tt.close();
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/hbase-shims/hbase-0/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-shims/hbase-0/pom.xml b/hbase-shims/hbase-0/pom.xml
deleted file mode 100644
index aa7f067..0000000
--- a/hbase-shims/hbase-0/pom.xml
+++ /dev/null
@@ -1,19 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.omid</groupId>
- <artifactId>omid-shims-aggregator</artifactId>
- <version>0.8.2.11-SNAPSHOT</version>
- </parent>
-
- <artifactId>omid-hbase0-shims</artifactId>
- <name>Shims layer for HBase 0.x</name>
- <packaging>jar</packaging>
-
- <properties>
- <hbase.version>${hbase0.version}</hbase.version>
- </properties>
-
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/hbase-shims/hbase-0/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
----------------------------------------------------------------------
diff --git a/hbase-shims/hbase-0/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-shims/hbase-0/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
deleted file mode 100644
index a419a1d..0000000
--- a/hbase-shims/hbase-0/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-
-import java.io.IOException;
-
-public class Region {
-
- HRegion hRegion;
-
- public Region(HRegion hRegion) {
-
- this.hRegion = hRegion;
-
- }
-
- Result get(Get getOperation) throws IOException {
-
- return hRegion.get(getOperation);
-
- }
-
- void put(Put putOperation) throws IOException {
-
- hRegion.put(putOperation);
-
- }
-
- HRegionInfo getRegionInfo() {
-
- return hRegion.getRegionInfo();
-
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/hbase-shims/hbase-0/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
----------------------------------------------------------------------
diff --git a/hbase-shims/hbase-0/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java b/hbase-shims/hbase-0/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
deleted file mode 100644
index 56990f6..0000000
--- a/hbase-shims/hbase-0/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-public class ScannerContext {
-
- int getBatchLimit() {
-
- return -1;
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/hbase-shims/hbase-0/src/main/java/org/apache/omid/HBaseShims.java
----------------------------------------------------------------------
diff --git a/hbase-shims/hbase-0/src/main/java/org/apache/omid/HBaseShims.java b/hbase-shims/hbase-0/src/main/java/org/apache/omid/HBaseShims.java
deleted file mode 100644
index e66afca..0000000
--- a/hbase-shims/hbase-0/src/main/java/org/apache/omid/HBaseShims.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.omid;
-
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.Region;
-
-import java.io.IOException;
-
-public class HBaseShims {
-
- static public void setKeyValueSequenceId(KeyValue kv, int sequenceId) {
-
- kv.setMvccVersion(sequenceId);
-
- }
-
- static public Region getRegionCoprocessorRegion(RegionCoprocessorEnvironment env) {
-
- return new Region(env.getRegion());
-
- }
-
- static public void flushAllOnlineRegions(HRegionServer regionServer, TableName tableName) throws IOException {
-
- for (HRegion r : regionServer.getOnlineRegions(tableName)) {
- r.flushcache();
- }
-
- }
-
- static public void addFamilyToHTableDescriptor(HTableDescriptor tableDesc, HColumnDescriptor columnDesc) {
-
- tableDesc.addFamily(columnDesc);
-
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/hbase-shims/hbase-1/src/main/java/org/apache/omid/HBaseShims.java
----------------------------------------------------------------------
diff --git a/hbase-shims/hbase-1/src/main/java/org/apache/omid/HBaseShims.java b/hbase-shims/hbase-1/src/main/java/org/apache/omid/HBaseShims.java
index 636e892..85cb58e 100644
--- a/hbase-shims/hbase-1/src/main/java/org/apache/omid/HBaseShims.java
+++ b/hbase-shims/hbase-1/src/main/java/org/apache/omid/HBaseShims.java
@@ -17,13 +17,21 @@
*/
package org.apache.omid;
+import org.apache.hadoop.hbase.CellComparator;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
@@ -55,4 +63,27 @@ public class HBaseShims {
}
+ public static CellComparator cellComparatorInstance() {
+ return new CellComparator();
+ }
+
+ public static boolean OmidCompactionEnabled(ObserverContext<RegionCoprocessorEnvironment> env,
+ Store store,
+ String cfFlagValue) {
+ HTableDescriptor desc = env.getEnvironment().getRegion().getTableDesc();
+ HColumnDescriptor famDesc
+ = desc.getFamily(Bytes.toBytes(store.getColumnFamilyName()));
+ return Boolean.valueOf(famDesc.getValue(cfFlagValue));
+ }
+
+
+ public static void setCompaction(Connection conn, TableName table, byte[] columnFamily, String key, String value)
+ throws IOException {
+ try(Admin admin = conn.getAdmin()) {
+ HTableDescriptor desc = admin.getTableDescriptor(table);
+ HColumnDescriptor cfDesc = desc.getFamily(columnFamily);
+ cfDesc.setValue(key, value);
+ admin.modifyColumn(table, cfDesc);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/846e5a58/hbase-shims/hbase-1/src/main/java/org/apache/omid/OmidFilterBase.java
----------------------------------------------------------------------
diff --git a/hbase-shims/hbase-1/src/main/java/org/apache/omid/OmidFilterBase.java b/hbase-shims/hbase-1/src/main/java/org/apache/omid/OmidFilterBase.java
new file mode 100644
index 0000000..b96fbc1
--- /dev/null
+++ b/hbase-shims/hbase-1/src/main/java/org/apache/omid/OmidFilterBase.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+
+import java.io.IOException;
+
+public abstract class OmidFilterBase extends FilterBase {
+
+ public KeyValue getNextKeyHint(KeyValue currentKV) throws IOException {
+ Filter userFilter = getInnerFilter();
+ if (userFilter != null) {
+ return userFilter.getNextKeyHint(currentKV);
+ }
+ return super.getNextKeyHint(currentKV);
+ }
+
+ protected abstract Filter getInnerFilter();
+}