You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@omid.apache.org by yonigottesman <gi...@git.apache.org> on 2018/07/25 06:45:57 UTC
[GitHub] incubator-omid pull request #41: Support for user Filter when using coproces...
GitHub user yonigottesman opened a pull request:
https://github.com/apache/incubator-omid/pull/41
Support for user Filter when using coprocessor for snapshot filtering
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/yonigottesman/incubator-omid scan_filters
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/incubator-omid/pull/41.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #41
----
commit 41554ec4c3cdfdbfd271e9adbef866b3653c3382
Author: Yonatan Gottesman <yo...@...>
Date: 2018-07-25T06:40:14Z
Support for user Filter when using coprocessor for snapshot filtering
----
---
[GitHub] incubator-omid pull request #41: [OMID-102] Support for user Filter when usi...
Posted by ohadshacham <gi...@git.apache.org>.
Github user ohadshacham commented on a diff in the pull request:
https://github.com/apache/incubator-omid/pull/41#discussion_r206856182
--- Diff: hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionVisibilityFilter.java ---
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+import com.google.common.base.Optional;
+import com.sun.istack.Nullable;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.util.*;
+
+public class TransactionVisibilityFilter extends FilterBase {
+
+ // optional sub-filter to apply to visible cells
+ private final Filter userFilter;
+ private final SnapshotFilterImpl snapshotFilter;
+ private final Map<Long ,Long> shadowCellCache;
+ private final HBaseTransaction hbaseTransaction;
+ private final Map<String, Long> familyDeletionCache;
+
+ public SnapshotFilter getSnapshotFilter() {
+ return snapshotFilter;
+ }
+
+ public TransactionVisibilityFilter(@Nullable Filter cellFilter,
+ SnapshotFilterImpl snapshotFilter,
+ HBaseTransaction hbaseTransaction) {
+ this.userFilter = cellFilter;
+ this.snapshotFilter = snapshotFilter;
+ shadowCellCache = new HashMap<>();
+ this.hbaseTransaction = hbaseTransaction;
+ familyDeletionCache = new HashMap<String, Long>();
+ }
+
+ @Override
+ public ReturnCode filterKeyValue(Cell v) throws IOException {
+ if (CellUtils.isShadowCell(v)) {
+ Long commitTs = Bytes.toLong(CellUtil.cloneValue(v));
+ shadowCellCache.put(v.getTimestamp(), commitTs);
+ // Continue getting shadow cells until one of them fits this transaction
+ if (hbaseTransaction.getStartTimestamp() >= commitTs) {
+ return ReturnCode.NEXT_COL;
+ } else {
+ return ReturnCode.SKIP;
+ }
+ } else if (CellUtils.isFamilyDeleteCell(v)) {
+ //Delete is part of this transaction
+ if (snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent()) {
+ familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), v.getTimestamp());
+ return ReturnCode.NEXT_COL;
+ }
+
+ if (shadowCellCache.containsKey(v.getTimestamp()) &&
--- End diff --
You can do a get and check the existence using the return value. It saves a collection access.
---
[GitHub] incubator-omid issue #41: [OMID-102] Support for user Filter when using copr...
Posted by JamesRTaylor <gi...@git.apache.org>.
Github user JamesRTaylor commented on the issue:
https://github.com/apache/incubator-omid/pull/41
This is great, @yonigottesman! Do the Phoenix unit tests FlappingTransactionIT.testInflightUpdateNotSeen() and testInflightDeleteNotSeen() pass with this change? You can try running them from the omid2 feature branch.
---
[GitHub] incubator-omid pull request #41: [OMID-102] Support for user Filter when usi...
Posted by JamesRTaylor <gi...@git.apache.org>.
Github user JamesRTaylor commented on a diff in the pull request:
https://github.com/apache/incubator-omid/pull/41#discussion_r206936274
--- Diff: hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java ---
@@ -52,6 +52,7 @@
static byte[] DELETE_TOMBSTONE = Bytes.toBytes("__OMID_TOMBSTONE__");
--- End diff --
Will Cells end up with these constant values and if so can we make them shorter?
---
[GitHub] incubator-omid pull request #41: [OMID-102] Support for user Filter when usi...
Posted by ohadshacham <gi...@git.apache.org>.
Github user ohadshacham commented on a diff in the pull request:
https://github.com/apache/incubator-omid/pull/41#discussion_r206857467
--- Diff: hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionVisibilityFilter.java ---
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+import com.google.common.base.Optional;
+import com.sun.istack.Nullable;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.util.*;
+
+public class TransactionVisibilityFilter extends FilterBase {
+
+ // optional sub-filter to apply to visible cells
+ private final Filter userFilter;
+ private final SnapshotFilterImpl snapshotFilter;
+ private final Map<Long ,Long> shadowCellCache;
+ private final HBaseTransaction hbaseTransaction;
+ private final Map<String, Long> familyDeletionCache;
+
+ public SnapshotFilter getSnapshotFilter() {
+ return snapshotFilter;
+ }
+
+ public TransactionVisibilityFilter(@Nullable Filter cellFilter,
+ SnapshotFilterImpl snapshotFilter,
+ HBaseTransaction hbaseTransaction) {
+ this.userFilter = cellFilter;
+ this.snapshotFilter = snapshotFilter;
+ shadowCellCache = new HashMap<>();
+ this.hbaseTransaction = hbaseTransaction;
+ familyDeletionCache = new HashMap<String, Long>();
+ }
+
+ @Override
+ public ReturnCode filterKeyValue(Cell v) throws IOException {
+ if (CellUtils.isShadowCell(v)) {
+ Long commitTs = Bytes.toLong(CellUtil.cloneValue(v));
+ shadowCellCache.put(v.getTimestamp(), commitTs);
+ // Continue getting shadow cells until one of them fits this transaction
+ if (hbaseTransaction.getStartTimestamp() >= commitTs) {
+ return ReturnCode.NEXT_COL;
+ } else {
+ return ReturnCode.SKIP;
+ }
+ } else if (CellUtils.isFamilyDeleteCell(v)) {
+ //Delete is part of this transaction
+ if (snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent()) {
+ familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), v.getTimestamp());
+ return ReturnCode.NEXT_COL;
+ }
+
+ if (shadowCellCache.containsKey(v.getTimestamp()) &&
+ hbaseTransaction.getStartTimestamp() >= shadowCellCache.get(v.getTimestamp())) {
+ familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), shadowCellCache.get(v.getTimestamp()));
+ return ReturnCode.NEXT_COL;
+ }
+
+ // Try to get shadow cell from region
+ final Get get = new Get(CellUtil.cloneRow(v));
+ get.setTimeStamp(v.getTimestamp()).setMaxVersions(1);
+ get.addColumn(CellUtil.cloneFamily(v), CellUtils.addShadowCellSuffix(CellUtils.FAMILY_DELETE_QUALIFIER));
--- End diff --
Change the name of addShadowCellSuffix to addShadowCellMetadata?
---
[GitHub] incubator-omid pull request #41: [OMID-102] Support for user Filter when usi...
Posted by JamesRTaylor <gi...@git.apache.org>.
Github user JamesRTaylor commented on a diff in the pull request:
https://github.com/apache/incubator-omid/pull/41#discussion_r206935434
--- Diff: hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionVisibilityFilter.java ---
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+import com.google.common.base.Optional;
+import com.sun.istack.Nullable;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.util.*;
+
+public class TransactionVisibilityFilter extends FilterBase {
+
+ // optional sub-filter to apply to visible cells
+ private final Filter userFilter;
+ private final SnapshotFilterImpl snapshotFilter;
+ private final Map<Long ,Long> shadowCellCache;
+ private final HBaseTransaction hbaseTransaction;
+ private final Map<String, Long> familyDeletionCache;
+
+ public SnapshotFilter getSnapshotFilter() {
+ return snapshotFilter;
+ }
+
+ public TransactionVisibilityFilter(@Nullable Filter cellFilter,
+ SnapshotFilterImpl snapshotFilter,
+ HBaseTransaction hbaseTransaction) {
+ this.userFilter = cellFilter;
+ this.snapshotFilter = snapshotFilter;
+ shadowCellCache = new HashMap<>();
+ this.hbaseTransaction = hbaseTransaction;
+ familyDeletionCache = new HashMap<String, Long>();
+ }
+
+ @Override
+ public ReturnCode filterKeyValue(Cell v) throws IOException {
+ if (CellUtils.isShadowCell(v)) {
+ Long commitTs = Bytes.toLong(CellUtil.cloneValue(v));
+ shadowCellCache.put(v.getTimestamp(), commitTs);
+ // Continue getting shadow cells until one of them fits this transaction
+ if (hbaseTransaction.getStartTimestamp() >= commitTs) {
+ return ReturnCode.NEXT_COL;
+ } else {
+ return ReturnCode.SKIP;
+ }
+ } else if (CellUtils.isFamilyDeleteCell(v)) {
+ //Delete is part of this transaction
+ if (snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent()) {
+ familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), v.getTimestamp());
+ return ReturnCode.NEXT_COL;
+ }
+
+ if (shadowCellCache.containsKey(v.getTimestamp()) &&
+ hbaseTransaction.getStartTimestamp() >= shadowCellCache.get(v.getTimestamp())) {
+ familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), shadowCellCache.get(v.getTimestamp()));
+ return ReturnCode.NEXT_COL;
+ }
+
+ // Try to get shadow cell from region
+ final Get get = new Get(CellUtil.cloneRow(v));
+ get.setTimeStamp(v.getTimestamp()).setMaxVersions(1);
+ get.addColumn(CellUtil.cloneFamily(v), CellUtils.addShadowCellSuffix(CellUtils.FAMILY_DELETE_QUALIFIER));
+ Result deleteFamilySC = snapshotFilter.getTableAccessWrapper().get(get);
+
+ if (!deleteFamilySC.isEmpty() &&
+ Bytes.toLong(CellUtil.cloneValue(deleteFamilySC.rawCells()[0] )) < hbaseTransaction.getStartTimestamp()){
+ familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), Bytes.toLong(CellUtil.cloneValue(deleteFamilySC.rawCells()[0])));
+ return ReturnCode.NEXT_COL;
+ }
+
+ //At last go to commit table
+ Optional<Long> commitTimestamp = snapshotFilter.tryToLocateCellCommitTimestamp(hbaseTransaction.getEpoch(),
+ v, shadowCellCache);
+ if (commitTimestamp.isPresent() && hbaseTransaction.getStartTimestamp() >= commitTimestamp.get()) {
+ familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), commitTimestamp.get());
+ return ReturnCode.NEXT_COL;
+ }
+
+ // Continue getting the next version of the delete family,
+ // until we get one in the snapshot or move to next cell
+ return ReturnCode.SKIP;
+ }
+
+ if (familyDeletionCache.containsKey(Bytes.toString(CellUtil.cloneFamily(v)))
+ && familyDeletionCache.get(Bytes.toString(CellUtil.cloneFamily(v))) >= v.getTimestamp()) {
+ return ReturnCode.NEXT_COL;
+ }
+
+ if (isCellInSnapshot(v)) {
+
+ if (CellUtils.isTombstone(v)) {
+ return ReturnCode.NEXT_COL;
+ }
+
+ if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT) {
+ return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL);
+ } else if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL) {
+ if (snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent()) {
+ return runUserFilter(v, ReturnCode.INCLUDE);
+ } else {
+ return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL);
+ }
+ }
+ }
+ return ReturnCode.SKIP;
+ }
+
+
+ private ReturnCode runUserFilter(Cell v, ReturnCode snapshotReturn)
+ throws IOException {
+
+ if (userFilter == null) {
+ return snapshotReturn;
+ }
+
+ ReturnCode userRes = userFilter.filterKeyValue(v);
+ switch (userRes) {
+ case INCLUDE:
+ return snapshotReturn;
+ case SKIP:
+ return (snapshotReturn == ReturnCode.INCLUDE) ? ReturnCode.SKIP: ReturnCode.NEXT_COL;
+ default:
+ return userRes;
+ }
+
+ }
+
+
+ private boolean isCellInSnapshot(Cell v) throws IOException {
+ if (shadowCellCache.containsKey(v.getTimestamp()) &&
+ hbaseTransaction.getStartTimestamp() >= shadowCellCache.get(v.getTimestamp())) {
+ return true;
+ }
+ if (snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent()) {
+ return true;
+ }
+ Optional<Long> commitTS = snapshotFilter.getTSIfInSnapshot(v, hbaseTransaction, shadowCellCache);
+ if (commitTS.isPresent()) {
+ shadowCellCache.put(v.getTimestamp(), commitTS.get());
+ return true;
+ }
+ return false;
+ }
+
+
+ @Override
+ public void reset() throws IOException {
+ shadowCellCache.clear();
+ familyDeletionCache.clear();
+ if (userFilter != null) {
+ userFilter.reset();
+ }
+ }
+
+ @Override
+ public boolean filterRow() throws IOException {
+ if (userFilter != null) {
+ return userFilter.filterRow();
+ }
+ return super.filterRow();
+ }
+
+
+ @Override
+ public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
+ if (userFilter != null) {
+ return userFilter.filterRowKey(buffer, offset, length);
--- End diff --
I think this ok (this is what Tephra does).
---
[GitHub] incubator-omid pull request #41: [OMID-102] Support for user Filter when usi...
Posted by yonigottesman <gi...@git.apache.org>.
GitHub user yonigottesman reopened a pull request:
https://github.com/apache/incubator-omid/pull/41
[OMID-102] Support for user Filter when using coprocessor for snapshot filtering
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/yonigottesman/incubator-omid scan_filters
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/incubator-omid/pull/41.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #41
----
commit 41554ec4c3cdfdbfd271e9adbef866b3653c3382
Author: Yonatan Gottesman <yo...@...>
Date: 2018-07-25T06:40:14Z
Support for user Filter when using coprocessor for snapshot filtering
commit ba575384dcdca427bf80e55dcee58a1ed93bf744
Author: Yonatan Gottesman <yo...@...>
Date: 2018-07-25T07:41:42Z
In coprocessor filtering, get shadow cell of delete family before going to commit table
commit 26469a0893b4c9bc36e68d66583951fb6e82cf9d
Author: Yonatan Gottesman <yo...@...>
Date: 2018-07-26T08:15:27Z
add inMemoryCommitTable client option in omid coprocessor for testing
commit 8d632670ab580ca04e0c5cec555211d1fa71a11a
Author: Yonatan Gottesman <yo...@...>
Date: 2018-07-31T11:29:13Z
Fix visibilityFilter to check if delete family is in current TX
commit b2980c7cea2ef9c83a772d4ce3bd9a82f5ae4d81
Author: Yonatan Gottesman <yo...@...>
Date: 2018-08-01T09:34:00Z
Merge OMID-105 changes
----
---
[GitHub] incubator-omid pull request #41: [OMID-102] Support for user Filter when usi...
Posted by yonigottesman <gi...@git.apache.org>.
Github user yonigottesman closed the pull request at:
https://github.com/apache/incubator-omid/pull/41
---
[GitHub] incubator-omid issue #41: [OMID-102] Support for user Filter when using copr...
Posted by yonigottesman <gi...@git.apache.org>.
Github user yonigottesman commented on the issue:
https://github.com/apache/incubator-omid/pull/41
All FlappingTransactionIT tests pass
---
[GitHub] incubator-omid pull request #41: [OMID-102] Support for user Filter when usi...
Posted by yonigottesman <gi...@git.apache.org>.
Github user yonigottesman commented on a diff in the pull request:
https://github.com/apache/incubator-omid/pull/41#discussion_r208160776
--- Diff: hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java ---
@@ -227,24 +252,26 @@ private static boolean endsWith(byte[] value, int offset, int length, byte[] suf
return result == 0;
}
+ private static boolean startsWith(byte[] value, int offset, int length, byte[] prefix) {
--- End diff --
No because Bytes.startWith doesnt work with offset
---
[GitHub] incubator-omid pull request #41: [OMID-102] Support for user Filter when usi...
Posted by ohadshacham <gi...@git.apache.org>.
Github user ohadshacham commented on a diff in the pull request:
https://github.com/apache/incubator-omid/pull/41#discussion_r206848753
--- Diff: hbase-coprocessor/src/main/java/org/apache/omid/transaction/OmidSnapshotFilter.java ---
@@ -83,92 +76,97 @@ public void start(CoprocessorEnvironment env) throws IOException {
if (commitTableName != null) {
commitTableConf.setTableName(commitTableName);
}
- if (commitTableClient == null) {
- commitTableClient = initAndGetCommitTableClient();
- }
- snapshotFilter = new SnapshotFilterImpl(commitTableClient);
-
LOG.info("Snapshot filter started");
}
@Override
public void stop(CoprocessorEnvironment e) throws IOException {
LOG.info("Stopping snapshot filter coprocessor");
- commitTableClient.close();
+ if (snapshotFilterQueue != null) {
+ for (SnapshotFilter snapshotFilter: snapshotFilterQueue) {
+ ((SnapshotFilterImpl)snapshotFilter).getCommitTableClient().close();
+ }
+ }
LOG.info("Snapshot filter stopped");
}
- public void setCommitTableClient(CommitTable.Client commitTableClient) {
- this.commitTableClient = commitTableClient;
- this.snapshotFilter.setCommitTableClient(commitTableClient);
- }
@Override
- public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get, List<Cell> result) throws IOException {
-
- if (get.getAttribute(CellUtils.CLIENT_GET_ATTRIBUTE) == null) return;
-
- try {
- get.setAttribute(CellUtils.CLIENT_GET_ATTRIBUTE, null);
- RegionAccessWrapper regionAccessWrapper = new RegionAccessWrapper(HBaseShims.getRegionCoprocessorRegion(c.getEnvironment()));
- Result res = regionAccessWrapper.get(get); // get parameters were set at the client side
-
- snapshotFilter.setTableAccessWrapper(regionAccessWrapper);
+ public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results)
+ throws IOException {
+ if (get.getFilter() != null) {
+ //This get had a filter and used a commit table client that must put back
+ assert (get.getFilter() instanceof TransactionVisibilityFilter);
+ TransactionVisibilityFilter filter = (TransactionVisibilityFilter)get.getFilter();
+ snapshotFilterQueue.add((SnapshotFilterImpl)filter.getSnapshotFilter());
+ }
+ }
- List<Cell> filteredKeyValues = Collections.emptyList();
- if (!res.isEmpty()) {
- TSOProto.Transaction transaction = TSOProto.Transaction.parseFrom(get.getAttribute(CellUtils.TRANSACTION_ATTRIBUTE));
- long id = transaction.getTimestamp();
- long readTs = transaction.getReadTimestamp();
- long epoch = transaction.getEpoch();
- VisibilityLevel visibilityLevel = VisibilityLevel.fromInteger(transaction.getVisibilityLevel());
+ @Override
+ public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results)
+ throws IOException {
+ if (get.getAttribute(CellUtils.CLIENT_GET_ATTRIBUTE) == null) return;
- HBaseTransaction hbaseTransaction = new HBaseTransaction(id, readTs, visibilityLevel, epoch, new HashSet<HBaseCellId>(), new HashSet<HBaseCellId>(), null);
- filteredKeyValues = snapshotFilter.filterCellsForSnapshot(res.listCells(), hbaseTransaction, get.getMaxVersions(), new HashMap<String, Long>(), get.getAttributesMap());
- }
+ HBaseTransaction hbaseTransaction = getHBaseTransaction(get.getAttribute(CellUtils.TRANSACTION_ATTRIBUTE));
+ SnapshotFilterImpl snapshotFilter = getSnapshotFilter(e);
+ get.setMaxVersions();
--- End diff --
I would add a comment that we set to max versions since we are doing Omid filtering in the VisibilityFilter
---
[GitHub] incubator-omid pull request #41: [OMID-102] Support for user Filter when usi...
Posted by JamesRTaylor <gi...@git.apache.org>.
Github user JamesRTaylor commented on a diff in the pull request:
https://github.com/apache/incubator-omid/pull/41#discussion_r206927180
--- Diff: hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionVisibilityFilter.java ---
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+import com.google.common.base.Optional;
+import com.sun.istack.Nullable;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.util.*;
+
+public class TransactionVisibilityFilter extends FilterBase {
+
+ // optional sub-filter to apply to visible cells
+ private final Filter userFilter;
+ private final SnapshotFilterImpl snapshotFilter;
+ private final Map<Long ,Long> shadowCellCache;
+ private final HBaseTransaction hbaseTransaction;
+ private final Map<String, Long> familyDeletionCache;
+
+ public SnapshotFilter getSnapshotFilter() {
+ return snapshotFilter;
+ }
+
+ public TransactionVisibilityFilter(@Nullable Filter cellFilter,
+ SnapshotFilterImpl snapshotFilter,
+ HBaseTransaction hbaseTransaction) {
+ this.userFilter = cellFilter;
+ this.snapshotFilter = snapshotFilter;
+ shadowCellCache = new HashMap<>();
+ this.hbaseTransaction = hbaseTransaction;
+ familyDeletionCache = new HashMap<String, Long>();
+ }
+
+ @Override
+ public ReturnCode filterKeyValue(Cell v) throws IOException {
+ if (CellUtils.isShadowCell(v)) {
+ Long commitTs = Bytes.toLong(CellUtil.cloneValue(v));
+ shadowCellCache.put(v.getTimestamp(), commitTs);
+ // Continue getting shadow cells until one of them fits this transaction
+ if (hbaseTransaction.getStartTimestamp() >= commitTs) {
+ return ReturnCode.NEXT_COL;
+ } else {
+ return ReturnCode.SKIP;
+ }
+ } else if (CellUtils.isFamilyDeleteCell(v)) {
+ //Delete is part of this transaction
+ if (snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent()) {
+ familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), v.getTimestamp());
+ return ReturnCode.NEXT_COL;
+ }
+
+ if (shadowCellCache.containsKey(v.getTimestamp()) &&
+ hbaseTransaction.getStartTimestamp() >= shadowCellCache.get(v.getTimestamp())) {
+ familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), shadowCellCache.get(v.getTimestamp()));
--- End diff --
Avoid allocating memory within this call. Either use TreeMap<byte[],Cell> and don't do the copy or use HashMap<ImmutableBytesPtr,Cell>. You can copy/paste ImmutableBytesPtr from Phoenix - it's a wrapper around byte[] that handles equality and hashcode without doing any copying.
---
[GitHub] incubator-omid pull request #41: [OMID-102] Support for user Filter when usi...
Posted by ohadshacham <gi...@git.apache.org>.
Github user ohadshacham commented on a diff in the pull request:
https://github.com/apache/incubator-omid/pull/41#discussion_r206843754
--- Diff: hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java ---
@@ -382,13 +385,16 @@ public int hashCode() {
hasher.putBytes(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
hasher.putBytes(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength());
int qualifierLength = cell.getQualifierLength();
+ int qualifierOffset = cell.getQualifierOffset();
if (isShadowCell()) { // Update qualifier length when qualifier is shadow cell
qualifierLength = qualifierLengthFromShadowCellQualifier(cell.getQualifierArray(),
cell.getQualifierOffset(),
cell.getQualifierLength());
+ qualifierOffset = qualifierOffset + 1;
--- End diff --
Will it work when the shadow cell prefix is absent? legacy data.
---
[GitHub] incubator-omid pull request #41: [OMID-102] Support for user Filter when usi...
Posted by ohadshacham <gi...@git.apache.org>.
Github user ohadshacham commented on a diff in the pull request:
https://github.com/apache/incubator-omid/pull/41#discussion_r206852241
--- Diff: hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionVisibilityFilter.java ---
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+import com.google.common.base.Optional;
+import com.sun.istack.Nullable;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.util.*;
+
+public class TransactionVisibilityFilter extends FilterBase {
+
+ // optional sub-filter to apply to visible cells
+ private final Filter userFilter;
+ private final SnapshotFilterImpl snapshotFilter;
+ private final Map<Long ,Long> shadowCellCache;
+ private final HBaseTransaction hbaseTransaction;
+ private final Map<String, Long> familyDeletionCache;
+
+ public SnapshotFilter getSnapshotFilter() {
+ return snapshotFilter;
+ }
+
+ public TransactionVisibilityFilter(@Nullable Filter cellFilter,
+ SnapshotFilterImpl snapshotFilter,
+ HBaseTransaction hbaseTransaction) {
+ this.userFilter = cellFilter;
+ this.snapshotFilter = snapshotFilter;
+ shadowCellCache = new HashMap<>();
+ this.hbaseTransaction = hbaseTransaction;
+ familyDeletionCache = new HashMap<String, Long>();
+ }
+
+ @Override
+ public ReturnCode filterKeyValue(Cell v) throws IOException {
+ if (CellUtils.isShadowCell(v)) {
+ Long commitTs = Bytes.toLong(CellUtil.cloneValue(v));
+ shadowCellCache.put(v.getTimestamp(), commitTs);
+ // Continue getting shadow cells until one of them fits this transaction
+ if (hbaseTransaction.getStartTimestamp() >= commitTs) {
--- End diff --
Why do we keep ones that committed after the transaction timestamp?
---
[GitHub] incubator-omid pull request #41: [OMID-102] Support for user Filter when usi...
Posted by ohadshacham <gi...@git.apache.org>.
Github user ohadshacham commented on a diff in the pull request:
https://github.com/apache/incubator-omid/pull/41#discussion_r206858386
--- Diff: hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionVisibilityFilter.java ---
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+import com.google.common.base.Optional;
+import com.sun.istack.Nullable;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.util.*;
+
+public class TransactionVisibilityFilter extends FilterBase {
+
+ // optional sub-filter to apply to visible cells
+ private final Filter userFilter;
+ private final SnapshotFilterImpl snapshotFilter;
+ private final Map<Long ,Long> shadowCellCache;
+ private final HBaseTransaction hbaseTransaction;
+ private final Map<String, Long> familyDeletionCache;
+
+ public SnapshotFilter getSnapshotFilter() {
+ return snapshotFilter;
+ }
+
+ public TransactionVisibilityFilter(@Nullable Filter cellFilter,
+ SnapshotFilterImpl snapshotFilter,
+ HBaseTransaction hbaseTransaction) {
+ this.userFilter = cellFilter;
+ this.snapshotFilter = snapshotFilter;
+ shadowCellCache = new HashMap<>();
+ this.hbaseTransaction = hbaseTransaction;
+ familyDeletionCache = new HashMap<String, Long>();
+ }
+
+ @Override
+ public ReturnCode filterKeyValue(Cell v) throws IOException {
+ if (CellUtils.isShadowCell(v)) {
+ Long commitTs = Bytes.toLong(CellUtil.cloneValue(v));
+ shadowCellCache.put(v.getTimestamp(), commitTs);
+ // Continue getting shadow cells until one of them fits this transaction
+ if (hbaseTransaction.getStartTimestamp() >= commitTs) {
+ return ReturnCode.NEXT_COL;
+ } else {
+ return ReturnCode.SKIP;
+ }
+ } else if (CellUtils.isFamilyDeleteCell(v)) {
+ //Delete is part of this transaction
+ if (snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent()) {
+ familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), v.getTimestamp());
+ return ReturnCode.NEXT_COL;
+ }
+
+ if (shadowCellCache.containsKey(v.getTimestamp()) &&
+ hbaseTransaction.getStartTimestamp() >= shadowCellCache.get(v.getTimestamp())) {
+ familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), shadowCellCache.get(v.getTimestamp()));
+ return ReturnCode.NEXT_COL;
+ }
+
+ // Try to get shadow cell from region
+ final Get get = new Get(CellUtil.cloneRow(v));
+ get.setTimeStamp(v.getTimestamp()).setMaxVersions(1);
+ get.addColumn(CellUtil.cloneFamily(v), CellUtils.addShadowCellSuffix(CellUtils.FAMILY_DELETE_QUALIFIER));
+ Result deleteFamilySC = snapshotFilter.getTableAccessWrapper().get(get);
+
+ if (!deleteFamilySC.isEmpty() &&
+ Bytes.toLong(CellUtil.cloneValue(deleteFamilySC.rawCells()[0] )) < hbaseTransaction.getStartTimestamp()){
+ familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), Bytes.toLong(CellUtil.cloneValue(deleteFamilySC.rawCells()[0])));
+ return ReturnCode.NEXT_COL;
+ }
+
+ //At last go to commit table
+ Optional<Long> commitTimestamp = snapshotFilter.tryToLocateCellCommitTimestamp(hbaseTransaction.getEpoch(),
+ v, shadowCellCache);
+ if (commitTimestamp.isPresent() && hbaseTransaction.getStartTimestamp() >= commitTimestamp.get()) {
+ familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), commitTimestamp.get());
+ return ReturnCode.NEXT_COL;
+ }
+
--- End diff --
We should recheck shadow cell in here. For cases that someone updated the shadow cell and remove the entry from the commit table between lines 87 and 94.
---
[GitHub] incubator-omid pull request #41: [OMID-102] Support for user Filter when usi...
Posted by ohadshacham <gi...@git.apache.org>.
Github user ohadshacham commented on a diff in the pull request:
https://github.com/apache/incubator-omid/pull/41#discussion_r206839768
--- Diff: hbase-client/src/test/java/org/apache/omid/transaction/TestCellUtils.java ---
@@ -99,11 +99,11 @@ public void testShadowCellQualifiers(byte[] shadowCellSuffixToTest) throws IOExc
public void testCorrectMapingOfCellsToShadowCells() throws IOException {
// Create the required data
final byte[] validShadowCellQualifier =
- com.google.common.primitives.Bytes.concat(qualifier, SHADOW_CELL_SUFFIX);
+ com.google.common.primitives.Bytes.concat(new byte[1], qualifier, SHADOW_CELL_SUFFIX);
--- End diff --
Define SHADOW_CELL_PREFIX to "new byte[1]" and replace all the new byte[1].
---
[GitHub] incubator-omid pull request #41: [OMID-102] Support for user Filter when usi...
Posted by JamesRTaylor <gi...@git.apache.org>.
Github user JamesRTaylor commented on a diff in the pull request:
https://github.com/apache/incubator-omid/pull/41#discussion_r207921192
--- Diff: hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionVisibilityFilter.java ---
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+import com.google.common.base.Optional;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.util.*;
+
+public class TransactionVisibilityFilter extends FilterBase {
+
+ // optional sub-filter to apply to visible cells
+ private final Filter userFilter;
+ private final SnapshotFilterImpl snapshotFilter;
+ private final Map<Long ,Long> commitCache;
+ private final HBaseTransaction hbaseTransaction;
+
+ // This cache is cleared when moving to the next row
+ // So no need to keep row name
+ private final Map<ImmutableBytesWritable, Long> familyDeletionCache;
+
+ public SnapshotFilter getSnapshotFilter() {
+ return snapshotFilter;
+ }
+
+ public TransactionVisibilityFilter(Filter cellFilter,
+ SnapshotFilterImpl snapshotFilter,
+ HBaseTransaction hbaseTransaction) {
+ this.userFilter = cellFilter;
+ this.snapshotFilter = snapshotFilter;
+ commitCache = new HashMap<>();
+ this.hbaseTransaction = hbaseTransaction;
+ familyDeletionCache = new HashMap<>();
+ }
+
+ @Override
+ public ReturnCode filterKeyValue(Cell v) throws IOException {
+ if (CellUtils.isShadowCell(v)) {
+ Long commitTs = Bytes.toLong(CellUtil.cloneValue(v));
+ commitCache.put(v.getTimestamp(), commitTs);
+ // Continue getting shadow cells until one of them fits this transaction
+ if (hbaseTransaction.getStartTimestamp() >= commitTs) {
+ return ReturnCode.NEXT_COL;
+ } else {
+ return ReturnCode.SKIP;
+ }
+ }
+
+ Optional<Long> ct = getCommitIfInSnapshot(v, CellUtils.isFamilyDeleteCell(v));
+ if (ct.isPresent()) {
+ commitCache.put(v.getTimestamp(), ct.get());
+ if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL &&
+ snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent()) {
+ return runUserFilter(v, ReturnCode.INCLUDE);
+ }
+ if (CellUtils.isFamilyDeleteCell(v)) {
+ familyDeletionCache.put(createImmutableBytesWritable(v), ct.get());
+ if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL) {
+ return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL);
+ } else {
+ return ReturnCode.NEXT_COL;
+ }
+ }
+ Long deleteCommit = familyDeletionCache.get(createImmutableBytesWritable(v));
+ if (deleteCommit != null && deleteCommit >= v.getTimestamp()) {
+ if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL) {
+ return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL);
+ } else {
+ return ReturnCode.NEXT_COL;
+ }
+ }
+ if (CellUtils.isTombstone(v)) {
+ if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL) {
+ return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL);
+ } else {
+ return ReturnCode.NEXT_COL;
+ }
+ }
+
+ return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL);
+ }
+
+ return ReturnCode.SKIP;
+ }
+
+
+ private ImmutableBytesWritable createImmutableBytesWritable(Cell v) {
+ return new ImmutableBytesWritable(v.getFamilyArray(),
+ v.getFamilyOffset(),v.getFamilyLength());
+ }
+
+ private ReturnCode runUserFilter(Cell v, ReturnCode snapshotReturn)
+ throws IOException {
+ assert(snapshotReturn == ReturnCode.INCLUDE_AND_NEXT_COL || snapshotReturn == ReturnCode.INCLUDE);
+ if (userFilter == null) {
+ return snapshotReturn;
+ }
+
+ ReturnCode userRes = userFilter.filterKeyValue(v);
+ switch (userRes) {
+ case INCLUDE:
+ return snapshotReturn;
+ case SKIP:
+ return (snapshotReturn == ReturnCode.INCLUDE) ? ReturnCode.SKIP: ReturnCode.NEXT_COL;
+ default:
+ return userRes;
+ }
+
+ }
+
+ // For family delete cells, the sc hasn't arrived yet so get sc from region before going to ct
+ private Optional<Long> getCommitIfInSnapshot(Cell v, boolean getShadowCellBeforeCT) throws IOException {
+ Long cachedCommitTS = commitCache.get(v.getTimestamp());
+ if (cachedCommitTS != null && hbaseTransaction.getStartTimestamp() >= cachedCommitTS) {
+ return Optional.of(cachedCommitTS);
+ }
+ if (snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent()) {
+ return Optional.of(v.getTimestamp());
+ }
+
+ if (getShadowCellBeforeCT) {
+
+ // Try to get shadow cell from region
+ final Get get = new Get(CellUtil.cloneRow(v));
+ get.setTimeStamp(v.getTimestamp()).setMaxVersions(1);
+ get.addColumn(CellUtil.cloneFamily(v), CellUtils.addShadowCellSuffixPrefix(CellUtils.FAMILY_DELETE_QUALIFIER));
+ Result shadowCell = snapshotFilter.getTableAccessWrapper().get(get);
+
+ if (!shadowCell.isEmpty() &&
+ Bytes.toLong(CellUtil.cloneValue(shadowCell.rawCells()[0] )) <= hbaseTransaction.getStartTimestamp()){
--- End diff --
Don't repeat Bytes.toLong(CellUtil.cloneValue(shadowCell.rawCells()[0] )) here. You want to minimize heap allocation during filtering.
---
[GitHub] incubator-omid pull request #41: [OMID-102] Support for user Filter when usi...
Posted by ohadshacham <gi...@git.apache.org>.
Github user ohadshacham commented on a diff in the pull request:
https://github.com/apache/incubator-omid/pull/41#discussion_r206861622
--- Diff: hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionVisibilityFilter.java ---
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+import com.google.common.base.Optional;
+import com.sun.istack.Nullable;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.util.*;
+
+public class TransactionVisibilityFilter extends FilterBase {
+
+ // optional sub-filter to apply to visible cells
+ private final Filter userFilter;
+ private final SnapshotFilterImpl snapshotFilter;
+ private final Map<Long ,Long> shadowCellCache;
+ private final HBaseTransaction hbaseTransaction;
+ private final Map<String, Long> familyDeletionCache;
+
+ public SnapshotFilter getSnapshotFilter() {
+ return snapshotFilter;
+ }
+
+ public TransactionVisibilityFilter(@Nullable Filter cellFilter,
+ SnapshotFilterImpl snapshotFilter,
+ HBaseTransaction hbaseTransaction) {
+ this.userFilter = cellFilter;
+ this.snapshotFilter = snapshotFilter;
+ shadowCellCache = new HashMap<>();
+ this.hbaseTransaction = hbaseTransaction;
+ familyDeletionCache = new HashMap<String, Long>();
+ }
+
+ @Override
+ public ReturnCode filterKeyValue(Cell v) throws IOException {
+ if (CellUtils.isShadowCell(v)) {
+ Long commitTs = Bytes.toLong(CellUtil.cloneValue(v));
+ shadowCellCache.put(v.getTimestamp(), commitTs);
+ // Continue getting shadow cells until one of them fits this transaction
+ if (hbaseTransaction.getStartTimestamp() >= commitTs) {
+ return ReturnCode.NEXT_COL;
+ } else {
+ return ReturnCode.SKIP;
+ }
+ } else if (CellUtils.isFamilyDeleteCell(v)) {
+ //Delete is part of this transaction
+ if (snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent()) {
+ familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), v.getTimestamp());
+ return ReturnCode.NEXT_COL;
+ }
+
+ if (shadowCellCache.containsKey(v.getTimestamp()) &&
+ hbaseTransaction.getStartTimestamp() >= shadowCellCache.get(v.getTimestamp())) {
+ familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), shadowCellCache.get(v.getTimestamp()));
+ return ReturnCode.NEXT_COL;
+ }
+
+ // Try to get shadow cell from region
+ final Get get = new Get(CellUtil.cloneRow(v));
+ get.setTimeStamp(v.getTimestamp()).setMaxVersions(1);
+ get.addColumn(CellUtil.cloneFamily(v), CellUtils.addShadowCellSuffix(CellUtils.FAMILY_DELETE_QUALIFIER));
+ Result deleteFamilySC = snapshotFilter.getTableAccessWrapper().get(get);
+
+ if (!deleteFamilySC.isEmpty() &&
+ Bytes.toLong(CellUtil.cloneValue(deleteFamilySC.rawCells()[0] )) < hbaseTransaction.getStartTimestamp()){
+ familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), Bytes.toLong(CellUtil.cloneValue(deleteFamilySC.rawCells()[0])));
+ return ReturnCode.NEXT_COL;
+ }
+
+ //At last go to commit table
+ Optional<Long> commitTimestamp = snapshotFilter.tryToLocateCellCommitTimestamp(hbaseTransaction.getEpoch(),
+ v, shadowCellCache);
+ if (commitTimestamp.isPresent() && hbaseTransaction.getStartTimestamp() >= commitTimestamp.get()) {
+ familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), commitTimestamp.get());
+ return ReturnCode.NEXT_COL;
+ }
+
+ // Continue getting the next version of the delete family,
+ // until we get one in the snapshot or move to next cell
+ return ReturnCode.SKIP;
+ }
+
+ if (familyDeletionCache.containsKey(Bytes.toString(CellUtil.cloneFamily(v)))
+ && familyDeletionCache.get(Bytes.toString(CellUtil.cloneFamily(v))) >= v.getTimestamp()) {
+ return ReturnCode.NEXT_COL;
+ }
+
+ if (isCellInSnapshot(v)) {
+
+ if (CellUtils.isTombstone(v)) {
+ return ReturnCode.NEXT_COL;
+ }
+
+ if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT) {
+ return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL);
+ } else if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL) {
+ if (snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent()) {
+ return runUserFilter(v, ReturnCode.INCLUDE);
+ } else {
+ return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL);
--- End diff --
I would add a comment that since v is in snapshot and not in transaction then it is the last result for SNAPSHOT_ALL.
---
[GitHub] incubator-omid pull request #41: [OMID-102] Support for user Filter when usi...
Posted by JamesRTaylor <gi...@git.apache.org>.
Github user JamesRTaylor commented on a diff in the pull request:
https://github.com/apache/incubator-omid/pull/41#discussion_r207918413
--- Diff: hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java ---
@@ -227,24 +252,26 @@ private static boolean endsWith(byte[] value, int offset, int length, byte[] suf
return result == 0;
}
+ private static boolean startsWith(byte[] value, int offset, int length, byte[] prefix) {
--- End diff --
Can you just use Bytes.startsWith() instead?
---
[GitHub] incubator-omid pull request #41: [OMID-102] Support for user Filter when usi...
Posted by ohadshacham <gi...@git.apache.org>.
Github user ohadshacham commented on a diff in the pull request:
https://github.com/apache/incubator-omid/pull/41#discussion_r206866085
--- Diff: hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionVisibilityFilter.java ---
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+import com.google.common.base.Optional;
+import com.sun.istack.Nullable;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.util.*;
+
+public class TransactionVisibilityFilter extends FilterBase {
+
+ // optional sub-filter to apply to visible cells
+ private final Filter userFilter;
+ private final SnapshotFilterImpl snapshotFilter;
+ private final Map<Long ,Long> shadowCellCache;
+ private final HBaseTransaction hbaseTransaction;
+ private final Map<String, Long> familyDeletionCache;
+
+ public SnapshotFilter getSnapshotFilter() {
+ return snapshotFilter;
+ }
+
+ public TransactionVisibilityFilter(@Nullable Filter cellFilter,
+ SnapshotFilterImpl snapshotFilter,
+ HBaseTransaction hbaseTransaction) {
+ this.userFilter = cellFilter;
+ this.snapshotFilter = snapshotFilter;
+ shadowCellCache = new HashMap<>();
+ this.hbaseTransaction = hbaseTransaction;
+ familyDeletionCache = new HashMap<String, Long>();
+ }
+
+ @Override
+ public ReturnCode filterKeyValue(Cell v) throws IOException {
+ if (CellUtils.isShadowCell(v)) {
+ Long commitTs = Bytes.toLong(CellUtil.cloneValue(v));
+ shadowCellCache.put(v.getTimestamp(), commitTs);
+ // Continue getting shadow cells until one of them fits this transaction
+ if (hbaseTransaction.getStartTimestamp() >= commitTs) {
+ return ReturnCode.NEXT_COL;
+ } else {
+ return ReturnCode.SKIP;
+ }
+ } else if (CellUtils.isFamilyDeleteCell(v)) {
+ //Delete is part of this transaction
+ if (snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent()) {
+ familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), v.getTimestamp());
+ return ReturnCode.NEXT_COL;
+ }
+
+ if (shadowCellCache.containsKey(v.getTimestamp()) &&
+ hbaseTransaction.getStartTimestamp() >= shadowCellCache.get(v.getTimestamp())) {
+ familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), shadowCellCache.get(v.getTimestamp()));
+ return ReturnCode.NEXT_COL;
+ }
+
+ // Try to get shadow cell from region
+ final Get get = new Get(CellUtil.cloneRow(v));
+ get.setTimeStamp(v.getTimestamp()).setMaxVersions(1);
+ get.addColumn(CellUtil.cloneFamily(v), CellUtils.addShadowCellSuffix(CellUtils.FAMILY_DELETE_QUALIFIER));
+ Result deleteFamilySC = snapshotFilter.getTableAccessWrapper().get(get);
+
+ if (!deleteFamilySC.isEmpty() &&
+ Bytes.toLong(CellUtil.cloneValue(deleteFamilySC.rawCells()[0] )) < hbaseTransaction.getStartTimestamp()){
+ familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), Bytes.toLong(CellUtil.cloneValue(deleteFamilySC.rawCells()[0])));
+ return ReturnCode.NEXT_COL;
+ }
+
+ //At last go to commit table
+ Optional<Long> commitTimestamp = snapshotFilter.tryToLocateCellCommitTimestamp(hbaseTransaction.getEpoch(),
+ v, shadowCellCache);
+ if (commitTimestamp.isPresent() && hbaseTransaction.getStartTimestamp() >= commitTimestamp.get()) {
+ familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), commitTimestamp.get());
+ return ReturnCode.NEXT_COL;
+ }
+
+ // Continue getting the next version of the delete family,
+ // until we get one in the snapshot or move to next cell
+ return ReturnCode.SKIP;
+ }
+
+ if (familyDeletionCache.containsKey(Bytes.toString(CellUtil.cloneFamily(v)))
+ && familyDeletionCache.get(Bytes.toString(CellUtil.cloneFamily(v))) >= v.getTimestamp()) {
+ return ReturnCode.NEXT_COL;
+ }
+
+ if (isCellInSnapshot(v)) {
+
+ if (CellUtils.isTombstone(v)) {
+ return ReturnCode.NEXT_COL;
+ }
+
+ if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT) {
+ return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL);
+ } else if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL) {
+ if (snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent()) {
+ return runUserFilter(v, ReturnCode.INCLUDE);
+ } else {
+ return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL);
+ }
+ }
+ }
+ return ReturnCode.SKIP;
+ }
+
+
+ private ReturnCode runUserFilter(Cell v, ReturnCode snapshotReturn)
+ throws IOException {
+
+ if (userFilter == null) {
+ return snapshotReturn;
+ }
+
+ ReturnCode userRes = userFilter.filterKeyValue(v);
+ switch (userRes) {
+ case INCLUDE:
+ return snapshotReturn;
+ case SKIP:
+ return (snapshotReturn == ReturnCode.INCLUDE) ? ReturnCode.SKIP: ReturnCode.NEXT_COL;
+ default:
+ return userRes;
+ }
+
+ }
+
+
+ private boolean isCellInSnapshot(Cell v) throws IOException {
+ if (shadowCellCache.containsKey(v.getTimestamp()) &&
--- End diff --
two collection calls.
---
[GitHub] incubator-omid pull request #41: [OMID-102] Support for user Filter when usi...
Posted by JamesRTaylor <gi...@git.apache.org>.
Github user JamesRTaylor commented on a diff in the pull request:
https://github.com/apache/incubator-omid/pull/41#discussion_r207919955
--- Diff: hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java ---
@@ -226,8 +222,115 @@ public void testGetFirstResult() throws Throwable {
}
@Test(timeOut = 60_000)
- public void testGetSecondResult() throws Throwable {
--- End diff --
Is there a test which would have failed without this patch (i.e. one that demonstrates the need for having the visibility filtering done as a pure HBase filter)?
---
[GitHub] incubator-omid issue #41: [OMID-102] Support for user Filter when using copr...
Posted by JamesRTaylor <gi...@git.apache.org>.
Github user JamesRTaylor commented on the issue:
https://github.com/apache/incubator-omid/pull/41
Nice work, @yonigottesman. I made a few minor comments, @ohadshacham. My main question is do the Phoenix unit tests FlappingTransactionIT.testInflightUpdateNotSeen() and testInflightDeleteNotSeen() pass with this change? You can try running them from the omid2 feature branch in Phoenix against the phoenix-integration branch in omid2 with your patch applied.
---
[GitHub] incubator-omid pull request #41: [OMID-102] Support for user Filter when usi...
Posted by ohadshacham <gi...@git.apache.org>.
Github user ohadshacham commented on a diff in the pull request:
https://github.com/apache/incubator-omid/pull/41#discussion_r206867892
--- Diff: hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionVisibilityFilter.java ---
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+import com.google.common.base.Optional;
+import com.sun.istack.Nullable;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.util.*;
+
+public class TransactionVisibilityFilter extends FilterBase {
+
+ // optional sub-filter to apply to visible cells
+ private final Filter userFilter;
+ private final SnapshotFilterImpl snapshotFilter;
+ private final Map<Long ,Long> shadowCellCache;
+ private final HBaseTransaction hbaseTransaction;
+ private final Map<String, Long> familyDeletionCache;
+
+ public SnapshotFilter getSnapshotFilter() {
+ return snapshotFilter;
+ }
+
+ public TransactionVisibilityFilter(@Nullable Filter cellFilter,
+ SnapshotFilterImpl snapshotFilter,
+ HBaseTransaction hbaseTransaction) {
+ this.userFilter = cellFilter;
+ this.snapshotFilter = snapshotFilter;
+ shadowCellCache = new HashMap<>();
+ this.hbaseTransaction = hbaseTransaction;
+ familyDeletionCache = new HashMap<String, Long>();
+ }
+
+ @Override
+ public ReturnCode filterKeyValue(Cell v) throws IOException {
+ if (CellUtils.isShadowCell(v)) {
+ Long commitTs = Bytes.toLong(CellUtil.cloneValue(v));
+ shadowCellCache.put(v.getTimestamp(), commitTs);
+ // Continue getting shadow cells until one of them fits this transaction
+ if (hbaseTransaction.getStartTimestamp() >= commitTs) {
+ return ReturnCode.NEXT_COL;
+ } else {
+ return ReturnCode.SKIP;
+ }
+ } else if (CellUtils.isFamilyDeleteCell(v)) {
+ //Delete is part of this transaction
+ if (snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent()) {
+ familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), v.getTimestamp());
+ return ReturnCode.NEXT_COL;
+ }
+
+ if (shadowCellCache.containsKey(v.getTimestamp()) &&
+ hbaseTransaction.getStartTimestamp() >= shadowCellCache.get(v.getTimestamp())) {
+ familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), shadowCellCache.get(v.getTimestamp()));
+ return ReturnCode.NEXT_COL;
+ }
+
+ // Try to get shadow cell from region
+ final Get get = new Get(CellUtil.cloneRow(v));
+ get.setTimeStamp(v.getTimestamp()).setMaxVersions(1);
+ get.addColumn(CellUtil.cloneFamily(v), CellUtils.addShadowCellSuffix(CellUtils.FAMILY_DELETE_QUALIFIER));
+ Result deleteFamilySC = snapshotFilter.getTableAccessWrapper().get(get);
+
+ if (!deleteFamilySC.isEmpty() &&
+ Bytes.toLong(CellUtil.cloneValue(deleteFamilySC.rawCells()[0] )) < hbaseTransaction.getStartTimestamp()){
+ familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), Bytes.toLong(CellUtil.cloneValue(deleteFamilySC.rawCells()[0])));
+ return ReturnCode.NEXT_COL;
+ }
+
+ //At last go to commit table
+ Optional<Long> commitTimestamp = snapshotFilter.tryToLocateCellCommitTimestamp(hbaseTransaction.getEpoch(),
+ v, shadowCellCache);
+ if (commitTimestamp.isPresent() && hbaseTransaction.getStartTimestamp() >= commitTimestamp.get()) {
+ familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), commitTimestamp.get());
+ return ReturnCode.NEXT_COL;
+ }
+
+ // Continue getting the next version of the delete family,
+ // until we get one in the snapshot or move to next cell
+ return ReturnCode.SKIP;
+ }
+
+ if (familyDeletionCache.containsKey(Bytes.toString(CellUtil.cloneFamily(v)))
+ && familyDeletionCache.get(Bytes.toString(CellUtil.cloneFamily(v))) >= v.getTimestamp()) {
+ return ReturnCode.NEXT_COL;
+ }
+
+ if (isCellInSnapshot(v)) {
+
+ if (CellUtils.isTombstone(v)) {
+ return ReturnCode.NEXT_COL;
+ }
+
+ if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT) {
+ return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL);
+ } else if (hbaseTransaction.getVisibilityLevel() == AbstractTransaction.VisibilityLevel.SNAPSHOT_ALL) {
+ if (snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent()) {
+ return runUserFilter(v, ReturnCode.INCLUDE);
+ } else {
+ return runUserFilter(v, ReturnCode.INCLUDE_AND_NEXT_COL);
+ }
+ }
+ }
+ return ReturnCode.SKIP;
+ }
+
+
+ private ReturnCode runUserFilter(Cell v, ReturnCode snapshotReturn)
+ throws IOException {
+
+ if (userFilter == null) {
+ return snapshotReturn;
+ }
+
+ ReturnCode userRes = userFilter.filterKeyValue(v);
+ switch (userRes) {
+ case INCLUDE:
+ return snapshotReturn;
+ case SKIP:
+ return (snapshotReturn == ReturnCode.INCLUDE) ? ReturnCode.SKIP: ReturnCode.NEXT_COL;
+ default:
+ return userRes;
+ }
+
+ }
+
+
+ private boolean isCellInSnapshot(Cell v) throws IOException {
+ if (shadowCellCache.containsKey(v.getTimestamp()) &&
+ hbaseTransaction.getStartTimestamp() >= shadowCellCache.get(v.getTimestamp())) {
+ return true;
+ }
+ if (snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent()) {
+ return true;
+ }
+ Optional<Long> commitTS = snapshotFilter.getTSIfInSnapshot(v, hbaseTransaction, shadowCellCache);
+ if (commitTS.isPresent()) {
+ shadowCellCache.put(v.getTimestamp(), commitTS.get());
+ return true;
+ }
+ return false;
+ }
+
+
+ @Override
+ public void reset() throws IOException {
+ shadowCellCache.clear();
+ familyDeletionCache.clear();
+ if (userFilter != null) {
+ userFilter.reset();
+ }
+ }
+
+ @Override
+ public boolean filterRow() throws IOException {
+ if (userFilter != null) {
+ return userFilter.filterRow();
+ }
+ return super.filterRow();
+ }
+
+
+ @Override
+ public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException {
+ if (userFilter != null) {
+ return userFilter.filterRowKey(buffer, offset, length);
--- End diff --
Does this function call before each filterRowKey(Cell) ? That's what the documentation say. If it is true then the whole implementation should be different...
"Filters a row based on the row key. If this returns true, the entire row will be excluded. If false, each KeyValue in the row will be passed to filterCell(Cell) below"
---
[GitHub] incubator-omid pull request #41: [OMID-102] Support for user Filter when usi...
Posted by JamesRTaylor <gi...@git.apache.org>.
Github user JamesRTaylor commented on a diff in the pull request:
https://github.com/apache/incubator-omid/pull/41#discussion_r207918920
--- Diff: hbase-client/src/main/java/org/apache/omid/transaction/HTableAccessWrapper.java ---
@@ -20,10 +20,7 @@
import java.io.IOException;
import java.util.List;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.*;
--- End diff --
Was the use of wildcarding here intentional? Not sure about Omid, but in Phoenix we always explicitly specify all the imports.
---
[GitHub] incubator-omid pull request #41: [OMID-102] Support for user Filter when usi...
Posted by ohadshacham <gi...@git.apache.org>.
Github user ohadshacham commented on a diff in the pull request:
https://github.com/apache/incubator-omid/pull/41#discussion_r206849947
--- Diff: hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionVisibilityFilter.java ---
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+import com.google.common.base.Optional;
+import com.sun.istack.Nullable;
--- End diff --
Different Nullable usage
---
[GitHub] incubator-omid pull request #41: [OMID-102] Support for user Filter when usi...
Posted by ohadshacham <gi...@git.apache.org>.
Github user ohadshacham commented on a diff in the pull request:
https://github.com/apache/incubator-omid/pull/41#discussion_r206843655
--- Diff: hbase-common/src/main/java/org/apache/omid/transaction/CellUtils.java ---
@@ -362,7 +365,7 @@ public boolean equals(Object o) {
cell.getQualifierOffset(),
cell.getQualifierLength());
if (!matchingQualifier(otherCell,
- cell.getQualifierArray(), cell.getQualifierOffset(), qualifierLength)) {
+ cell.getQualifierArray(), cell.getQualifierOffset() + 1, qualifierLength)) {
--- End diff --
Will it work when the shadow cell prefix is absent? legacy data.
---
[GitHub] incubator-omid pull request #41: [OMID-102] Support for user Filter when usi...
Posted by JamesRTaylor <gi...@git.apache.org>.
Github user JamesRTaylor commented on a diff in the pull request:
https://github.com/apache/incubator-omid/pull/41#discussion_r206934998
--- Diff: hbase-coprocessor/src/test/java/org/apache/omid/transaction/TestSnapshotFilter.java ---
@@ -226,8 +222,115 @@ public void testGetFirstResult() throws Throwable {
}
@Test(timeOut = 60_000)
- public void testGetSecondResult() throws Throwable {
--- End diff --
Would be good to have a test that uses a scan with FirstKeyOnlyFilter that would get incorrect results without this new implementation.
---
[GitHub] incubator-omid pull request #41: [OMID-102] Support for user Filter when usi...
Posted by ohadshacham <gi...@git.apache.org>.
Github user ohadshacham commented on a diff in the pull request:
https://github.com/apache/incubator-omid/pull/41#discussion_r206862264
--- Diff: hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionVisibilityFilter.java ---
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+import com.google.common.base.Optional;
+import com.sun.istack.Nullable;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.util.*;
+
+public class TransactionVisibilityFilter extends FilterBase {
+
+ // optional sub-filter to apply to visible cells
+ private final Filter userFilter;
+ private final SnapshotFilterImpl snapshotFilter;
+ private final Map<Long ,Long> shadowCellCache;
+ private final HBaseTransaction hbaseTransaction;
+ private final Map<String, Long> familyDeletionCache;
+
+ public SnapshotFilter getSnapshotFilter() {
+ return snapshotFilter;
+ }
+
+ public TransactionVisibilityFilter(@Nullable Filter cellFilter,
+ SnapshotFilterImpl snapshotFilter,
+ HBaseTransaction hbaseTransaction) {
+ this.userFilter = cellFilter;
+ this.snapshotFilter = snapshotFilter;
+ shadowCellCache = new HashMap<>();
+ this.hbaseTransaction = hbaseTransaction;
+ familyDeletionCache = new HashMap<String, Long>();
+ }
+
+ @Override
+ public ReturnCode filterKeyValue(Cell v) throws IOException {
+ if (CellUtils.isShadowCell(v)) {
+ Long commitTs = Bytes.toLong(CellUtil.cloneValue(v));
+ shadowCellCache.put(v.getTimestamp(), commitTs);
+ // Continue getting shadow cells until one of them fits this transaction
+ if (hbaseTransaction.getStartTimestamp() >= commitTs) {
+ return ReturnCode.NEXT_COL;
+ } else {
+ return ReturnCode.SKIP;
+ }
+ } else if (CellUtils.isFamilyDeleteCell(v)) {
+ //Delete is part of this transaction
+ if (snapshotFilter.getTSIfInTransaction(v, hbaseTransaction).isPresent()) {
+ familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), v.getTimestamp());
+ return ReturnCode.NEXT_COL;
+ }
+
+ if (shadowCellCache.containsKey(v.getTimestamp()) &&
+ hbaseTransaction.getStartTimestamp() >= shadowCellCache.get(v.getTimestamp())) {
+ familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), shadowCellCache.get(v.getTimestamp()));
+ return ReturnCode.NEXT_COL;
+ }
+
+ // Try to get shadow cell from region
+ final Get get = new Get(CellUtil.cloneRow(v));
+ get.setTimeStamp(v.getTimestamp()).setMaxVersions(1);
+ get.addColumn(CellUtil.cloneFamily(v), CellUtils.addShadowCellSuffix(CellUtils.FAMILY_DELETE_QUALIFIER));
+ Result deleteFamilySC = snapshotFilter.getTableAccessWrapper().get(get);
+
+ if (!deleteFamilySC.isEmpty() &&
+ Bytes.toLong(CellUtil.cloneValue(deleteFamilySC.rawCells()[0] )) < hbaseTransaction.getStartTimestamp()){
+ familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), Bytes.toLong(CellUtil.cloneValue(deleteFamilySC.rawCells()[0])));
+ return ReturnCode.NEXT_COL;
+ }
+
+ //At last go to commit table
+ Optional<Long> commitTimestamp = snapshotFilter.tryToLocateCellCommitTimestamp(hbaseTransaction.getEpoch(),
+ v, shadowCellCache);
+ if (commitTimestamp.isPresent() && hbaseTransaction.getStartTimestamp() >= commitTimestamp.get()) {
+ familyDeletionCache.put(Bytes.toString(CellUtil.cloneFamily(v)), commitTimestamp.get());
+ return ReturnCode.NEXT_COL;
+ }
+
+ // Continue getting the next version of the delete family,
+ // until we get one in the snapshot or move to next cell
+ return ReturnCode.SKIP;
+ }
+
+ if (familyDeletionCache.containsKey(Bytes.toString(CellUtil.cloneFamily(v)))
+ && familyDeletionCache.get(Bytes.toString(CellUtil.cloneFamily(v))) >= v.getTimestamp()) {
+ return ReturnCode.NEXT_COL;
+ }
+
+ if (isCellInSnapshot(v)) {
+
+ if (CellUtils.isTombstone(v)) {
+ return ReturnCode.NEXT_COL;
--- End diff --
What about tombstones in snapshot_all? I assume it should be SKIP.
---
[GitHub] incubator-omid pull request #41: [OMID-102] Support for user Filter when usi...
Posted by ohadshacham <gi...@git.apache.org>.
Github user ohadshacham commented on a diff in the pull request:
https://github.com/apache/incubator-omid/pull/41#discussion_r206860096
--- Diff: hbase-coprocessor/src/main/java/org/apache/omid/transaction/TransactionVisibilityFilter.java ---
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.omid.transaction;
+
+import com.google.common.base.Optional;
+import com.sun.istack.Nullable;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.io.IOException;
+import java.util.*;
+
+public class TransactionVisibilityFilter extends FilterBase {
+
+ // optional sub-filter to apply to visible cells
+ private final Filter userFilter;
+ private final SnapshotFilterImpl snapshotFilter;
+ private final Map<Long ,Long> shadowCellCache;
+ private final HBaseTransaction hbaseTransaction;
+ private final Map<String, Long> familyDeletionCache;
--- End diff --
I would add a comment that the row info is redundant in here since reset is called between rows and we clear this map in reset.
---