You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by ki...@apache.org on 2016/05/06 16:23:15 UTC
[6/6] incubator-asterixdb git commit: Deadlock-free locking protocol
is enabled
Deadlock-free locking protocol is enabled
- Added EntityCommitProfiler class in TransactionSubsystem.java file:
This profiler takes a report interval (in seconds) parameter and
reports entity level commit count every report interval (in seconds)
only if IS_PROFILE_MODE is set to true. The profiler runs in a separate
thread. However, the profiler thread doesn't start reporting the count
until the entityCommitCount > 0. The profiler can be used to measure
1) IPS (Inserts Per Second) and
2) IIPS (instantaneous IPS) for the every report interval.
Change-Id: Ie58ae2f519baa53599e99b51bd61ea5f8366dafd
Reviewed-on: https://asterix-gerrit.ics.uci.edu/825
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Murtadha Hubail <hu...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/commit/23be9068
Tree: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/tree/23be9068
Diff: http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/diff/23be9068
Branch: refs/heads/master
Commit: 23be90686bc60b6455bdb7d1ccf3084e14c6cd7d
Parents: 7c15c13
Author: kisskys <ki...@apache.org>
Authored: Thu May 5 23:27:28 2016 -0700
Committer: Young-Seok Kim <ki...@gmail.com>
Committed: Fri May 6 09:22:17 2016 -0700
----------------------------------------------------------------------
.../asterix/optimizer/base/RuleCollections.java | 2 -
.../IntroduceInstantLockSearchCallbackRule.java | 156 --
.../config/AsterixTransactionProperties.java | 8 +
...erixLSMInsertDeleteOperatorNodePushable.java | 50 +-
.../AbstractOperationCallbackFactory.java | 1 -
.../common/transactions/ILockManager.java | 2 -
.../asterix/common/transactions/ILogRecord.java | 1 +
.../asterix/common/transactions/LogRecord.java | 28 +-
.../asterix/common/transactions/LogType.java | 4 +
.../indexing/ExternalFileIndexAccessor.java | 2 +-
...ExternalBTreeSearchOperatorNodePushable.java | 2 +-
...ExternalRTreeSearchOperatorNodePushable.java | 2 +-
.../apache/asterix/metadata/MetadataNode.java | 25 +-
.../asterix/metadata/api/IMetadataNode.java | 75 +-
.../metadata/declared/AqlMetadataProvider.java | 19 +-
...rixLSMPrimaryUpsertOperatorNodePushable.java | 14 +-
.../LockThenSearchOperationCallback.java | 72 +-
.../LockThenSearchOperationCallbackFactory.java | 9 +-
...exInstantSearchOperationCallbackFactory.java | 3 +-
...imaryIndexModificationOperationCallback.java | 52 +-
...dexModificationOperationCallbackFactory.java | 6 +-
...maryIndexSearchOperationCallbackFactory.java | 3 +-
...dexModificationOperationCallbackFactory.java | 4 +-
...daryIndexSearchOperationCallbackFactory.java | 3 +-
...dexModificationOperationCallbackFactory.java | 4 +-
...dexModificationOperationCallbackFactory.java | 4 +-
.../UpsertOperationCallbackFactory.java | 4 +-
.../service/locking/ConcurrentLockManager.java | 204 +-
.../service/locking/DatasetLockInfo.java | 536 ----
.../service/locking/DeadlockDetector.java | 255 --
.../service/locking/DummyLockManager.java | 93 -
.../service/locking/DumpTablePrinter.java | 15 +-
.../service/locking/EntityInfoManager.java | 730 ------
.../service/locking/EntityLockInfoManager.java | 827 -------
.../service/locking/ILockHashTable.java | 39 -
.../management/service/locking/ILockMatrix.java | 48 -
.../management/service/locking/JobInfo.java | 334 ---
.../management/service/locking/LockManager.java | 2289 ------------------
.../LockManagerDeterministicUnitTest.java | 664 -----
.../locking/LockManagerRandomUnitTest.java | 636 -----
.../service/locking/LockRequestTracker.java | 75 -
.../management/service/locking/LockWaiter.java | 149 --
.../service/locking/LockWaiterManager.java | 403 ---
.../service/locking/TimeOutDetector.java | 101 -
.../management/service/logging/LogBuffer.java | 41 +-
.../management/service/logging/LogManager.java | 4 +-
.../logging/LogManagerWithReplication.java | 18 +-
.../service/recovery/RecoveryManager.java | 6 +-
.../TransactionManagementConstants.java | 45 +-
.../service/transaction/TransactionManager.java | 2 +-
.../transaction/TransactionSubsystem.java | 76 +-
.../service/locking/LockManagerUnitTest.java | 124 +-
.../management/service/locking/Locker.java | 39 +-
.../management/service/locking/Request.java | 15 +-
.../storage/am/btree/test/FramewriterTest.java | 2 +-
.../IModificationOperationCallbackFactory.java | 4 +-
.../api/ISearchOperationCallbackFactory.java | 3 +-
...xInsertUpdateDeleteOperatorNodePushable.java | 2 +-
.../IndexSearchOperatorNodePushable.java | 2 +-
...eIndexDiskOrderScanOperatorNodePushable.java | 2 +-
.../impls/NoOpOperationCallbackFactory.java | 10 +-
.../am/lsm/common/api/ILSMIndexFrameWriter.java | 49 +
...xInsertUpdateDeleteOperatorNodePushable.java | 7 +-
.../lsm/common/impls/ConstantMergePolicy.java | 1 -
64 files changed, 638 insertions(+), 7767 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
index a5fa9a6..814c570 100644
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
+++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/RuleCollections.java
@@ -42,7 +42,6 @@ import org.apache.asterix.optimizer.rules.IntroduceAutogenerateIDRule;
import org.apache.asterix.optimizer.rules.IntroduceDynamicTypeCastForExternalFunctionRule;
import org.apache.asterix.optimizer.rules.IntroduceDynamicTypeCastRule;
import org.apache.asterix.optimizer.rules.IntroduceEnforcedListTypeRule;
-import org.apache.asterix.optimizer.rules.IntroduceInstantLockSearchCallbackRule;
import org.apache.asterix.optimizer.rules.IntroduceMaterializationForInsertWithSelfScanRule;
import org.apache.asterix.optimizer.rules.IntroduceRandomPartitioningFeedComputationRule;
import org.apache.asterix.optimizer.rules.IntroduceRapidFrameFlushProjectAssignRule;
@@ -291,7 +290,6 @@ public final class RuleCollections {
physicalRewritesAllLevels.add(new ReplaceSinkOpWithCommitOpRule());
physicalRewritesAllLevels.add(new SetAlgebricksPhysicalOperatorsRule());
physicalRewritesAllLevels.add(new SetAsterixPhysicalOperatorsRule());
- physicalRewritesAllLevels.add(new IntroduceInstantLockSearchCallbackRule());
physicalRewritesAllLevels.add(new AddEquivalenceClassForRecordConstructorRule());
physicalRewritesAllLevels.add(new EnforceStructuralPropertiesRule());
physicalRewritesAllLevels.add(new RemoveSortInFeedIngestionRule());
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceInstantLockSearchCallbackRule.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceInstantLockSearchCallbackRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceInstantLockSearchCallbackRule.java
deleted file mode 100644
index a3d0640..0000000
--- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/IntroduceInstantLockSearchCallbackRule.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.optimizer.rules;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.asterix.algebra.operators.CommitOperator;
-import org.apache.asterix.algebra.operators.physical.BTreeSearchPOperator;
-import org.apache.asterix.metadata.declared.AqlDataSource;
-import org.apache.asterix.metadata.declared.AqlMetadataImplConfig;
-import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
-import org.apache.asterix.optimizer.rules.am.AccessMethodJobGenParams;
-import org.apache.commons.lang3.mutable.Mutable;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Triple;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
-import org.apache.hyracks.algebricks.core.algebra.base.IPhysicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.ExtensionOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
-import org.apache.hyracks.algebricks.core.algebra.operators.physical.DataSourceScanPOperator;
-import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
-
-public class IntroduceInstantLockSearchCallbackRule implements IAlgebraicRewriteRule {
-
- @Override
- public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
- throws AlgebricksException {
- return false;
- }
-
- private void extractDataSourcesInfo(AbstractLogicalOperator op,
- Map<String, Triple<Integer, LogicalOperatorTag, IPhysicalOperator>> dataSourcesMap) {
-
- for (int i = 0; i < op.getInputs().size(); ++i) {
- AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) op.getInputs().get(i).getValue();
-
- if (descendantOp.getOperatorTag() == LogicalOperatorTag.UNNEST_MAP) {
- UnnestMapOperator unnestMapOp = (UnnestMapOperator) descendantOp;
- ILogicalExpression unnestExpr = unnestMapOp.getExpressionRef().getValue();
- if (unnestExpr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
- AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) unnestExpr;
- FunctionIdentifier fid = f.getFunctionIdentifier();
- if (fid.equals(AsterixBuiltinFunctions.EXTERNAL_LOOKUP)) {
- return;
- }
- if (!fid.equals(AsterixBuiltinFunctions.INDEX_SEARCH)) {
- throw new IllegalStateException();
- }
- AccessMethodJobGenParams jobGenParams = new AccessMethodJobGenParams();
- jobGenParams.readFromFuncArgs(f.getArguments());
- boolean isPrimaryIndex = jobGenParams.isPrimaryIndex();
- String indexName = jobGenParams.getIndexName();
- if (isPrimaryIndex) {
- if (dataSourcesMap.containsKey(indexName)) {
- ++(dataSourcesMap.get(indexName).first);
- } else {
- dataSourcesMap.put(indexName, new Triple<Integer, LogicalOperatorTag, IPhysicalOperator>(1,
- LogicalOperatorTag.UNNEST_MAP, unnestMapOp.getPhysicalOperator()));
- }
- }
- }
- } else if (descendantOp.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) {
- DataSourceScanOperator dataSourceScanOp = (DataSourceScanOperator) descendantOp;
- String datasourceName = ((AqlDataSource) dataSourceScanOp.getDataSource()).getId().getDatasourceName();
- if (dataSourcesMap.containsKey(datasourceName)) {
- ++(dataSourcesMap.get(datasourceName).first);
- } else {
- dataSourcesMap.put(datasourceName, new Triple<Integer, LogicalOperatorTag, IPhysicalOperator>(1,
- LogicalOperatorTag.DATASOURCESCAN, dataSourceScanOp.getPhysicalOperator()));
- }
- }
- extractDataSourcesInfo(descendantOp, dataSourcesMap);
- }
-
- }
-
- private boolean checkIfRuleIsApplicable(AbstractLogicalOperator op) {
- if (op.getPhysicalOperator() == null) {
- return false;
- }
- if (op.getOperatorTag() == LogicalOperatorTag.EXTENSION_OPERATOR) {
- ExtensionOperator extensionOp = (ExtensionOperator) op;
- if (extensionOp.getDelegate() instanceof CommitOperator) {
- return true;
- }
- }
- if (op.getOperatorTag() == LogicalOperatorTag.DISTRIBUTE_RESULT
- || op.getOperatorTag() == LogicalOperatorTag.WRITE_RESULT) {
- return true;
- }
- return false;
- }
-
- @Override
- public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context)
- throws AlgebricksException {
-
- AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
-
- if (!checkIfRuleIsApplicable(op)) {
- return false;
- }
- Map<String, Triple<Integer, LogicalOperatorTag, IPhysicalOperator>> dataSourcesMap = new HashMap<String, Triple<Integer, LogicalOperatorTag, IPhysicalOperator>>();
- extractDataSourcesInfo(op, dataSourcesMap);
-
- boolean introducedInstantLock = false;
-
- Iterator<Map.Entry<String, Triple<Integer, LogicalOperatorTag, IPhysicalOperator>>> it = dataSourcesMap
- .entrySet().iterator();
- while (it.hasNext()) {
- Entry<String, Triple<Integer, LogicalOperatorTag, IPhysicalOperator>> entry = it.next();
- Triple<Integer, LogicalOperatorTag, IPhysicalOperator> triple = entry.getValue();
- if (triple.first == 1) {
- AqlMetadataImplConfig aqlMetadataImplConfig = new AqlMetadataImplConfig(true);
- if (triple.second == LogicalOperatorTag.UNNEST_MAP) {
- BTreeSearchPOperator pOperator = (BTreeSearchPOperator) triple.third;
- pOperator.setImplConfig(aqlMetadataImplConfig);
- introducedInstantLock = true;
- } else {
- DataSourceScanPOperator pOperator = (DataSourceScanPOperator) triple.third;
- pOperator.setImplConfig(aqlMetadataImplConfig);
- introducedInstantLock = true;
- }
- }
-
- }
- return introducedInstantLock;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java
index 27b19e2..abc03e2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/AsterixTransactionProperties.java
@@ -52,6 +52,9 @@ public class AsterixTransactionProperties extends AbstractAsterixProperties {
private static final String TXN_LOCK_TIMEOUT_SWEEPTHRESHOLD_KEY = "txn.lock.timeout.sweepthreshold";
private static final int TXN_LOCK_TIMEOUT_SWEEPTHRESHOLD_DEFAULT = 10000; // 10s
+ private static final String TXN_COMMIT_PROFILER_REPORT_INTERVAL_KEY = "txn.commitprofiler.reportinterval";
+ private static final int TXN_COMMIT_PROFILER_REPORT_INTERVAL_DEFAULT = 5; // 5 seconds
+
public AsterixTransactionProperties(AsterixPropertiesAccessor accessor) {
super(accessor);
}
@@ -114,4 +117,9 @@ public class AsterixTransactionProperties extends AbstractAsterixProperties {
PropertyInterpreters.getIntegerPropertyInterpreter());
}
+ public int getCommitProfilerReportInterval() {
+ return accessor.getProperty(TXN_COMMIT_PROFILER_REPORT_INTERVAL_KEY,
+ TXN_COMMIT_PROFILER_REPORT_INTERVAL_DEFAULT, PropertyInterpreters.getIntegerPropertyInterpreter());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
index d53236b..5445b11 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
@@ -28,6 +28,7 @@ import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
@@ -44,6 +45,20 @@ public class AsterixLSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUp
private AbstractLSMIndex lsmIndex;
private int i = 0;
+ /**
+ * The following three variables are used to keep track of the information regarding flushing partial frame such as
+ * 1. whether there was a partial frame flush for the current frame,
+ * ==> captured in flushedPartialTuples variabl
+ * 2. the last flushed tuple index in the frame if there was a partial frame flush,
+ * ==> captured in lastFlushedTupleIdx variable
+ * 3. the current tuple index the frame, where this operator is working on the current tuple.
+ * ==> captured in currentTupleIdx variable
+ * These variables are reset for each frame, i.e., whenever nextFrame() is called, these variables are reset.
+ */
+ private boolean flushedPartialTuples;
+ private int currentTupleIdx;
+ private int lastFlushedTupleIdx;
+
public boolean isPrimary() {
return isPrimary;
}
@@ -60,13 +75,14 @@ public class AsterixLSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUp
RecordDescriptor inputRecDesc = recordDescProvider.getInputRecordDescriptor(opDesc.getActivityId(), 0);
accessor = new FrameTupleAccessor(inputRecDesc);
writeBuffer = new VSizeFrame(ctx);
+ appender = new FrameTupleAppender(writeBuffer);
indexHelper.open();
lsmIndex = (AbstractLSMIndex) indexHelper.getIndexInstance();
try {
writer.open();
modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
indexHelper.getResourcePath(), indexHelper.getResourceID(), indexHelper.getResourcePartition(),
- lsmIndex, ctx);
+ lsmIndex, ctx, this);
indexAccessor = lsmIndex.createAccessor(modCallback, NoOpOperationCallback.INSTANCE);
ITupleFilterFactory tupleFilterFactory = opDesc.getTupleFilterFactory();
if (tupleFilterFactory != null) {
@@ -83,11 +99,15 @@ public class AsterixLSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUp
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ currentTupleIdx = 0;
+ lastFlushedTupleIdx = 0;
+ flushedPartialTuples = false;
+
accessor.reset(buffer);
ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessor;
int tupleCount = accessor.getTupleCount();
try {
- for (; i < tupleCount; i++) {
+ for (; i < tupleCount; i++, currentTupleIdx++) {
if (tupleFilter != null) {
frameTuple.reset(accessor, i);
if (!tupleFilter.accept(frameTuple)) {
@@ -120,12 +140,34 @@ public class AsterixLSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUp
FrameDataException fde = new FrameDataException(i, th);
throw fde;
}
+
writeBuffer.ensureFrameSize(buffer.capacity());
- FrameUtils.copyAndFlip(buffer, writeBuffer.getBuffer());
- FrameUtils.flushFrame(writeBuffer.getBuffer(), writer);
+ if (flushedPartialTuples) {
+ flushPartialFrame();
+ } else {
+ FrameUtils.copyAndFlip(buffer, writeBuffer.getBuffer());
+ FrameUtils.flushFrame(writeBuffer.getBuffer(), writer);
+ }
i = 0;
}
+ /**
+ * flushes tuples in a frame from lastFlushedTupleIdx(inclusive) to currentTupleIdx(exclusive)
+ */
+ @Override
+ public void flushPartialFrame() throws HyracksDataException {
+ if (lastFlushedTupleIdx == currentTupleIdx) {
+ //nothing to flush
+ return;
+ }
+ for (int i = lastFlushedTupleIdx; i < currentTupleIdx; i++) {
+ FrameUtils.appendToWriter(writer, appender, accessor, i);
+ }
+ appender.write(writer, true);
+ lastFlushedTupleIdx = currentTupleIdx;
+ flushedPartialTuples = true;
+ }
+
@Override
public void close() throws HyracksDataException {
if (lsmIndex != null) {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java
index 6f54918..d1d869e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/AbstractOperationCallbackFactory.java
@@ -22,7 +22,6 @@ package org.apache.asterix.common.transactions;
import java.io.Serializable;
import org.apache.asterix.common.context.ITransactionSubsystemProvider;
-import org.apache.asterix.common.transactions.JobId;
public abstract class AbstractOperationCallbackFactory implements Serializable {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILockManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILockManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILockManager.java
index 7909622..495af9a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILockManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILockManager.java
@@ -68,7 +68,6 @@ public interface ILockManager {
* @param lockMode
* @param txnContext
* @throws ACIDException
- * TODO
* @return
*/
public void unlock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
@@ -80,7 +79,6 @@ public interface ILockManager {
* @param datasetId
* @param entityHashValue
* @param lockMode
- * TODO
* @param context
* @return
* @throws ACIDException
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
index 7e27c54..b86aebe 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
@@ -35,6 +35,7 @@ public interface ILogRecord {
public static final int ENTITY_COMMIT_LOG_BASE_SIZE = 30;
public static final int UPDATE_LOG_BASE_SIZE = 59;
public static final int FLUSH_LOG_SIZE = 18;
+ public static final int WAIT_LOG_SIZE = 14;
public LogRecord.RECORD_STATUS readLogRecord(ByteBuffer buffer);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index 003d4c6..2eb8244 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -65,6 +65,9 @@ import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter;
* 3) UPDATE: (Header1(6) + Header2(16) + + Header3(20) + Body(9) + Tail(8)) + PKValueSize + NewValueSize
* --> UPDATE_LOG_BASE_SIZE = 59
* 4) FLUSH: 18 bytes (Header1(6) + DatasetId(4) + Tail(8))
+ * 5) WAIT_LOG_SIZE: 14 bytes (Header1(6) + Tail(8))
+ * --> WAIT_LOG only requires LogType Field, but in order to conform the log reader protocol
+ * it also includes LogSource and JobId fields.
*/
public class LogRecord implements ILogRecord {
@@ -265,7 +268,16 @@ public class LogRecord implements ILogRecord {
logType = buffer.get();
jobId = buffer.getInt();
- if (logType != LogType.FLUSH) {
+ if (logType == LogType.FLUSH) {
+ if (buffer.remaining() < DatasetId.BYTES) {
+ return RECORD_STATUS.TRUNCATED;
+ }
+ datasetId = buffer.getInt();
+ resourceId = 0l;
+ computeAndSetLogSize();
+ } else if (logType == LogType.WAIT) {
+ computeAndSetLogSize();
+ } else {
if (logType == LogType.JOB_COMMIT || logType == LogType.ABORT) {
datasetId = -1;
PKHashValue = -1;
@@ -306,15 +318,8 @@ public class LogRecord implements ILogRecord {
} else {
computeAndSetLogSize();
}
- } else {
- computeAndSetLogSize();
- if (buffer.remaining() < DatasetId.BYTES) {
- return RECORD_STATUS.TRUNCATED;
- }
- datasetId = buffer.getInt();
- resourceId = 0l;
- computeAndSetLogSize();
}
+
return RECORD_STATUS.OK;
}
@@ -412,6 +417,9 @@ public class LogRecord implements ILogRecord {
case LogType.FLUSH:
logSize = FLUSH_LOG_SIZE;
break;
+ case LogType.WAIT:
+ logSize = WAIT_LOG_SIZE;
+ break;
default:
throw new IllegalStateException("Unsupported Log Type");
}
@@ -425,7 +433,7 @@ public class LogRecord implements ILogRecord {
builder.append(" LogType : ").append(LogType.toString(logType));
builder.append(" LogSize : ").append(logSize);
builder.append(" JobId : ").append(jobId);
- if (logType == LogType.ENTITY_COMMIT || logType == LogType.UPDATE) {
+ if (logType == LogType.ENTITY_COMMIT || logType == LogType.UPSERT_ENTITY_COMMIT || logType == LogType.UPDATE) {
builder.append(" DatasetId : ").append(datasetId);
builder.append(" ResourcePartition : ").append(resourcePartition);
builder.append(" PKHashValue : ").append(PKHashValue);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
index d6d2657..714b8f7 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
@@ -26,6 +26,7 @@ public class LogType {
public static final byte ABORT = 3;
public static final byte FLUSH = 4;
public static final byte UPSERT_ENTITY_COMMIT = 5;
+ public static final byte WAIT = 6;
private static final String STRING_UPDATE = "UPDATE";
private static final String STRING_JOB_COMMIT = "JOB_COMMIT";
@@ -33,6 +34,7 @@ public class LogType {
private static final String STRING_ABORT = "ABORT";
private static final String STRING_FLUSH = "FLUSH";
private static final String STRING_UPSERT_ENTITY_COMMIT = "UPSERT_ENTITY_COMMIT";
+ private static final String STRING_WAIT = "WAIT";
private static final String STRING_INVALID_LOG_TYPE = "INVALID_LOG_TYPE";
@@ -50,6 +52,8 @@ public class LogType {
return STRING_FLUSH;
case LogType.UPSERT_ENTITY_COMMIT:
return STRING_UPSERT_ENTITY_COMMIT;
+ case LogType.WAIT:
+ return STRING_WAIT;
default:
return STRING_INVALID_LOG_TYPE;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFileIndexAccessor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFileIndexAccessor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFileIndexAccessor.java
index 5d98961..667bae7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFileIndexAccessor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFileIndexAccessor.java
@@ -92,7 +92,7 @@ public class ExternalFileIndexAccessor implements Serializable {
// create the accessor and the cursor using the passed version
ISearchOperationCallback searchCallback = opDesc.getSearchOpCallbackFactory()
- .createSearchOperationCallback(indexDataflowHelper.getResourceID(), ctx);
+ .createSearchOperationCallback(indexDataflowHelper.getResourceID(), ctx, null);
fileIndexAccessor = index.createAccessor(searchCallback, indexDataflowHelper.getVersion());
fileIndexSearchCursor = fileIndexAccessor.createSearchCursor(false);
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorNodePushable.java
index 0513f9c..9435387 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorNodePushable.java
@@ -73,7 +73,7 @@ public class ExternalBTreeSearchOperatorNodePushable extends BTreeSearchOperator
dos = tb.getDataOutput();
appender = new FrameTupleAppender(new VSizeFrame(ctx));
ISearchOperationCallback searchCallback = opDesc.getSearchOpCallbackFactory()
- .createSearchOperationCallback(indexHelper.getResourceID(), ctx);
+ .createSearchOperationCallback(indexHelper.getResourceID(), ctx, null);
// The next line is the reason we override this method
indexAccessor = externalIndex.createAccessor(searchCallback, dataFlowHelper.getTargetVersion());
cursor = createCursor();
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorNodePushable.java
index 75cc1bf..81e6b17 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorNodePushable.java
@@ -72,7 +72,7 @@ public class ExternalRTreeSearchOperatorNodePushable extends RTreeSearchOperator
dos = tb.getDataOutput();
appender = new FrameTupleAppender(new VSizeFrame(ctx));
ISearchOperationCallback searchCallback = opDesc.getSearchOpCallbackFactory()
- .createSearchOperationCallback(indexHelper.getResourceID(), ctx);
+ .createSearchOperationCallback(indexHelper.getResourceID(), ctx, null);
// The next line is the reason we override this method
indexAccessor = rTreeIndex.createAccessor(searchCallback, rTreeDataflowHelper.getTargetVersion());
cursor = createCursor();
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 542c12d..54ec084 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -79,7 +79,6 @@ import org.apache.asterix.om.base.AString;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallback;
import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallback;
import org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
@@ -306,15 +305,13 @@ public class MetadataNode implements IMetadataNode {
IMetadataIndex metadataIndex, ILSMIndex lsmIndex, IndexOperation indexOp) throws ACIDException {
ITransactionContext txnCtx = transactionSubsystem.getTransactionManager().getTransactionContext(jobId, false);
- if (metadataIndex.isPrimaryIndex()) {
- return new PrimaryIndexModificationOperationCallback(metadataIndex.getDatasetId().getId(),
- metadataIndex.getPrimaryKeyIndexes(), txnCtx, transactionSubsystem.getLockManager(),
- transactionSubsystem, resourceId, metadataStoragePartition, ResourceType.LSM_BTREE, indexOp);
- } else {
- return new SecondaryIndexModificationOperationCallback(metadataIndex.getDatasetId().getId(),
- metadataIndex.getPrimaryKeyIndexes(), txnCtx, transactionSubsystem.getLockManager(),
- transactionSubsystem, resourceId, metadataStoragePartition, ResourceType.LSM_BTREE, indexOp);
- }
+ // Regardless of the index type (primary or secondary index), secondary index modification callback is given
+ // This is still correct since metadata index operation doesn't require any lock from ConcurrentLockMgr and
+ // The difference between primaryIndexModCallback and secondaryIndexModCallback is that primary index requires
+ // locks and secondary index doesn't.
+ return new SecondaryIndexModificationOperationCallback(metadataIndex.getDatasetId().getId(),
+ metadataIndex.getPrimaryKeyIndexes(), txnCtx, transactionSubsystem.getLockManager(),
+ transactionSubsystem, resourceId, metadataStoragePartition, ResourceType.LSM_BTREE, indexOp);
}
@Override
@@ -980,10 +977,8 @@ public class MetadataNode implements IMetadataNode {
try {
while (rangeCursor.hasNext()) {
rangeCursor.next();
- sb.append(TupleUtils.printTuple(rangeCursor.getTuple(),
- new ISerializerDeserializer[] {
- AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(
- BuiltinType.ASTRING),
+ sb.append(TupleUtils.printTuple(rangeCursor.getTuple(), new ISerializerDeserializer[] {
+ AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING),
AqlSerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING),
AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(BuiltinType.ASTRING) }));
@@ -1000,7 +995,7 @@ public class MetadataNode implements IMetadataNode {
private <ResultType> void searchIndex(JobId jobId, IMetadataIndex index, ITupleReference searchKey,
IValueExtractor<ResultType> valueExtractor, List<ResultType> results)
- throws MetadataException, IndexException, IOException {
+ throws MetadataException, IndexException, IOException {
IBinaryComparatorFactory[] comparatorFactories = index.getKeyBinaryComparatorFactory();
String resourceName = index.getFile().toString();
IIndex indexInstance = datasetLifecycleManager.getIndex(resourceName);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
index 7070a88..ec2e692 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java
@@ -148,8 +148,8 @@ public interface IMetadataNode extends Remote, Serializable {
* @throws MetadataException
* For example, if the dataverse does not exist. RemoteException
*/
- public List<Dataset> getDataverseDatasets(JobId jobId, String dataverseName) throws MetadataException,
- RemoteException;
+ public List<Dataset> getDataverseDatasets(JobId jobId, String dataverseName)
+ throws MetadataException, RemoteException;
/**
* Deletes the dataverse with given name, and all it's associated datasets,
@@ -194,8 +194,8 @@ public interface IMetadataNode extends Remote, Serializable {
* For example, if the dataset does not exist.
* @throws RemoteException
*/
- public Dataset getDataset(JobId jobId, String dataverseName, String datasetName) throws MetadataException,
- RemoteException;
+ public Dataset getDataset(JobId jobId, String dataverseName, String datasetName)
+ throws MetadataException, RemoteException;
/**
* Retrieves all indexes of a dataset, acquiring local locks on behalf of
@@ -229,8 +229,8 @@ public interface IMetadataNode extends Remote, Serializable {
* For example, if the dataset and/or dataverse does not exist.
* @throws RemoteException
*/
- public void dropDataset(JobId jobId, String dataverseName, String datasetName) throws MetadataException,
- RemoteException;
+ public void dropDataset(JobId jobId, String dataverseName, String datasetName)
+ throws MetadataException, RemoteException;
/**
* Inserts an index into the metadata, acquiring local locks on behalf of
@@ -313,8 +313,8 @@ public interface IMetadataNode extends Remote, Serializable {
* For example, if the datatype does not exist.
* @throws RemoteException
*/
- public Datatype getDatatype(JobId jobId, String dataverseName, String datatypeName) throws MetadataException,
- RemoteException;
+ public Datatype getDatatype(JobId jobId, String dataverseName, String datatypeName)
+ throws MetadataException, RemoteException;
/**
* Deletes the given datatype in given dataverse, acquiring local locks on
@@ -331,8 +331,8 @@ public interface IMetadataNode extends Remote, Serializable {
* deleted.
* @throws RemoteException
*/
- public void dropDatatype(JobId jobId, String dataverseName, String datatypeName) throws MetadataException,
- RemoteException;
+ public void dropDatatype(JobId jobId, String dataverseName, String datatypeName)
+ throws MetadataException, RemoteException;
/**
* Inserts a node group, acquiring local locks on behalf of the given
@@ -400,8 +400,8 @@ public interface IMetadataNode extends Remote, Serializable {
* @throws MetadataException
* @throws RemoteException
*/
- public Function getFunction(JobId jobId, FunctionSignature functionSignature) throws MetadataException,
- RemoteException;
+ public Function getFunction(JobId jobId, FunctionSignature functionSignature)
+ throws MetadataException, RemoteException;
/**
* Deletes a function, acquiring local locks on behalf of the given
@@ -416,8 +416,8 @@ public interface IMetadataNode extends Remote, Serializable {
* group to be deleted.
* @throws RemoteException
*/
- public void dropFunction(JobId jobId, FunctionSignature functionSignature) throws MetadataException,
- RemoteException;
+ public void dropFunction(JobId jobId, FunctionSignature functionSignature)
+ throws MetadataException, RemoteException;
/**
* @param jobId
@@ -438,8 +438,8 @@ public interface IMetadataNode extends Remote, Serializable {
* @throws MetadataException
* @throws RemoteException
*/
- public List<Function> getDataverseFunctions(JobId jobId, String dataverseName) throws MetadataException,
- RemoteException;
+ public List<Function> getDataverseFunctions(JobId jobId, String dataverseName)
+ throws MetadataException, RemoteException;
/**
* @param ctx
@@ -448,8 +448,8 @@ public interface IMetadataNode extends Remote, Serializable {
* @throws MetadataException
* @throws RemoteException
*/
- public List<DatasourceAdapter> getDataverseAdapters(JobId jobId, String dataverseName) throws MetadataException,
- RemoteException;
+ public List<DatasourceAdapter> getDataverseAdapters(JobId jobId, String dataverseName)
+ throws MetadataException, RemoteException;
/**
* @param jobId
@@ -475,8 +475,8 @@ public interface IMetadataNode extends Remote, Serializable {
* if the adapter does not exists.
* @throws RemoteException
*/
- public void dropAdapter(JobId jobId, String dataverseName, String adapterName) throws MetadataException,
- RemoteException;
+ public void dropAdapter(JobId jobId, String dataverseName, String adapterName)
+ throws MetadataException, RemoteException;
/**
* @param jobId
@@ -495,8 +495,8 @@ public interface IMetadataNode extends Remote, Serializable {
* @throws MetadataException
* @throws RemoteException
*/
- public void addCompactionPolicy(JobId jobId, CompactionPolicy compactionPolicy) throws MetadataException,
- RemoteException;
+ public void addCompactionPolicy(JobId jobId, CompactionPolicy compactionPolicy)
+ throws MetadataException, RemoteException;
/**
* @param jobId
@@ -506,8 +506,8 @@ public interface IMetadataNode extends Remote, Serializable {
* @throws MetadataException
* @throws RemoteException
*/
- public CompactionPolicy getCompactionPolicy(JobId jobId, String dataverse, String policy) throws MetadataException,
- RemoteException;
+ public CompactionPolicy getCompactionPolicy(JobId jobId, String dataverse, String policy)
+ throws MetadataException, RemoteException;
/**
* @param jobId
@@ -550,7 +550,6 @@ public interface IMetadataNode extends Remote, Serializable {
*/
public void dropFeed(JobId jobId, String dataverse, String feedName) throws MetadataException, RemoteException;
-
/**
* @param jobId
* @param feedPolicy
@@ -567,9 +566,8 @@ public interface IMetadataNode extends Remote, Serializable {
* @throws MetadataException
* @throws RemoteException
*/
- public FeedPolicyEntity getFeedPolicy(JobId jobId, String dataverse, String policy) throws MetadataException,
- RemoteException;
-
+ public FeedPolicyEntity getFeedPolicy(JobId jobId, String dataverse, String policy)
+ throws MetadataException, RemoteException;
/**
* Removes a library , acquiring local locks on behalf of the given
@@ -584,8 +582,8 @@ public interface IMetadataNode extends Remote, Serializable {
* if the library does not exists.
* @throws RemoteException
*/
- public void dropLibrary(JobId jobId, String dataverseName, String libraryName) throws MetadataException,
- RemoteException;
+ public void dropLibrary(JobId jobId, String dataverseName, String libraryName)
+ throws MetadataException, RemoteException;
/**
* Adds a library, acquiring local locks on behalf of the given
@@ -612,8 +610,8 @@ public interface IMetadataNode extends Remote, Serializable {
* @throws MetadataException
* @throws RemoteException
*/
- public Library getLibrary(JobId jobId, String dataverseName, String libraryName) throws MetadataException,
- RemoteException;
+ public Library getLibrary(JobId jobId, String dataverseName, String libraryName)
+ throws MetadataException, RemoteException;
/**
* Retireve libraries installed in a given dataverse.
@@ -626,8 +624,8 @@ public interface IMetadataNode extends Remote, Serializable {
* @throws MetadataException
* @throws RemoteException
*/
- public List<Library> getDataverseLibraries(JobId jobId, String dataverseName) throws MetadataException,
- RemoteException;
+ public List<Library> getDataverseLibraries(JobId jobId, String dataverseName)
+ throws MetadataException, RemoteException;
/**
* @param jobId
@@ -648,8 +646,8 @@ public interface IMetadataNode extends Remote, Serializable {
* @throws RemoteException
* @throws MetadataException
*/
- public void dropFeedPolicy(JobId jobId, String dataverseName, String policyName) throws MetadataException,
- RemoteException;
+ public void dropFeedPolicy(JobId jobId, String dataverseName, String policyName)
+ throws MetadataException, RemoteException;
/**
* @param jobId
@@ -658,8 +656,8 @@ public interface IMetadataNode extends Remote, Serializable {
* @throws MetadataException
* @throws RemoteException
*/
- public List<FeedPolicyEntity> getDataversePolicies(JobId jobId, String dataverse) throws MetadataException,
- RemoteException;
+ public List<FeedPolicyEntity> getDataversePolicies(JobId jobId, String dataverse)
+ throws MetadataException, RemoteException;
/**
* @param jobId
@@ -731,7 +729,6 @@ public interface IMetadataNode extends Remote, Serializable {
public ExternalFile getExternalFile(JobId jobId, String dataverseName, String datasetName, Integer fileNumber)
throws MetadataException, RemoteException;
-
/**
* update an existing dataset in the metadata, acquiring local locks on behalf
* of the given transaction id.
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
index 9fe72a4..5e2e227 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/AqlMetadataProvider.java
@@ -110,7 +110,6 @@ import org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOpera
import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
-import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexSearchOperationCallbackFactory;
import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexModificationOperationCallbackFactory;
import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerProvider;
import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexSearchOperationCallbackFactory;
@@ -730,17 +729,15 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
primaryKeyFields[i] = i;
}
- AqlMetadataImplConfig aqlMetadataImplConfig = (AqlMetadataImplConfig) implConfig;
ITransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
- if (aqlMetadataImplConfig != null && aqlMetadataImplConfig.isInstantLock()) {
- searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
- : new PrimaryIndexInstantSearchOperationCallbackFactory(jobId, datasetId, primaryKeyFields,
- txnSubsystemProvider, ResourceType.LSM_BTREE);
- } else {
- searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
- : new PrimaryIndexSearchOperationCallbackFactory(jobId, datasetId, primaryKeyFields,
- txnSubsystemProvider, ResourceType.LSM_BTREE);
- }
+
+ /**
+ * Due to the read-committed isolation level,
+ * we may acquire very short duration lock(i.e., instant lock) for readers.
+ */
+ searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
+ : new PrimaryIndexInstantSearchOperationCallbackFactory(jobId, datasetId, primaryKeyFields,
+ txnSubsystemProvider, ResourceType.LSM_BTREE);
}
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils
.getMergePolicyFactory(dataset, mdTxnCtx);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
index 83c6e34..7785978 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
@@ -128,10 +128,10 @@ public class AsterixLSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertU
appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
indexHelper.getResourcePath(), indexHelper.getResourceID(), indexHelper.getResourcePartition(),
- index, ctx);
+ index, ctx, this);
indexAccessor = index.createAccessor(modCallback, opDesc.getSearchOpCallbackFactory()
- .createSearchOperationCallback(indexHelper.getResourceID(), ctx));
+ .createSearchOperationCallback(indexHelper.getResourceID(), ctx, this));
cursor = indexAccessor.createSearchCursor(false);
frameTuple = new FrameTupleReference();
IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
@@ -191,6 +191,7 @@ public class AsterixLSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertU
//TODO: use tryDelete/tryInsert in order to prevent deadlocks
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+
accessor.reset(buffer);
LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor) indexAccessor;
int tupleCount = accessor.getTupleCount();
@@ -239,6 +240,15 @@ public class AsterixLSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertU
}
}
+ /**
+ * Flushes tuples (which have already been written to tuple appender's buffer in writeOutput() method)
+ * to the next operator/consumer.
+ */
+ @Override
+ public void flushPartialFrame() throws HyracksDataException {
+ appender.write(writer, true);
+ }
+
private ITupleReference getPrevTupleWithFilter(ITupleReference prevTuple) throws IOException, AsterixException {
prevRecWithPKWithFilterValue.reset();
for (int i = 0; i < prevTuple.getFieldCount(); i++) {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
index 49cea94..ef3b218 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallback.java
@@ -20,18 +20,40 @@ package org.apache.asterix.transaction.management.opcallbacks;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.transactions.AbstractOperationCallback;
-import org.apache.asterix.common.transactions.ILockManager;
+import org.apache.asterix.common.transactions.ILogManager;
+import org.apache.asterix.common.transactions.ILogRecord;
import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.common.transactions.LogRecord;
+import org.apache.asterix.common.transactions.LogSource;
+import org.apache.asterix.common.transactions.LogType;
import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
public class LockThenSearchOperationCallback extends AbstractOperationCallback implements ISearchOperationCallback {
- public LockThenSearchOperationCallback(int datasetId, int[] entityIdFields, ILockManager lockManager,
- ITransactionContext txnCtx) {
- super(datasetId, entityIdFields, txnCtx, lockManager);
+ /**
+ * variables used for deadlock-free locking protocol
+ */
+ private final LSMIndexInsertUpdateDeleteOperatorNodePushable operatorNodePushable;
+ private final ILogManager logManager;
+ private final ILogRecord logRecord;
+
+ public LockThenSearchOperationCallback(int datasetId, int[] entityIdFields, ITransactionSubsystem txnSubsystem,
+ ITransactionContext txnCtx, IOperatorNodePushable operatorNodePushable) {
+ super(datasetId, entityIdFields, txnCtx, txnSubsystem.getLockManager());
+ this.operatorNodePushable = (LSMIndexInsertUpdateDeleteOperatorNodePushable) operatorNodePushable;
+ this.logManager = txnSubsystem.getLogManager();
+ this.logRecord = new LogRecord();
+ logRecord.setTxnCtx(txnCtx);
+ logRecord.setLogSource(LogSource.LOCAL);
+ logRecord.setLogType(LogType.WAIT);
+ logRecord.setJobId(txnCtx.getJobId().getId());
+ logRecord.computeAndSetLogSize();
}
@Override
@@ -55,9 +77,49 @@ public class LockThenSearchOperationCallback extends AbstractOperationCallback i
public void before(ITupleReference tuple) throws HyracksDataException {
int pkHash = computePrimaryKeyHashValue(tuple, primaryKeyFields);
try {
- lockManager.lock(datasetId, pkHash, LockMode.X, txnCtx);
+ if (operatorNodePushable != null) {
+
+ /**********************************************************************************
+ * In order to achieve deadlock-free locking protocol during any write (insert/delete/upsert) operations,
+ * the following logic is implemented.
+ * See https://cwiki.apache.org/confluence/display/ASTERIXDB/Deadlock-Free+Locking+Protocol for more details.
+ * 1. for each entry in a frame
+ * 2. returnValue = tryLock() for an entry
+ * 3. if returnValue == false
+ * 3-1. flush all entries (which already acquired locks) to the next operator
+ * : this will make all those entries reach commit operator so that corresponding commit logs will be created.
+ * 3-2. create a WAIT log and wait until logFlusher thread will flush the WAIT log and gives notification
+ * : this notification guarantees that all locks acquired by this transactor (or all locks acquired for the entries)
+ * were released.
+ * 3-3. acquire lock using lock() instead of tryLock() for the failed entry
+ * : we know for sure this lock call will not cause deadlock since the transactor doesn't hold any other locks.
+ * 4. create an update log and insert the entry
+ * From the above logic, step 2 and 3 are implemented in this before() method.
+ **********************/
+
+ //release all locks held by this actor (which is a thread) by flushing partial frame.
+ boolean tryLockSucceed = lockManager.tryLock(datasetId, pkHash, LockMode.X, txnCtx);
+ if (!tryLockSucceed) {
+ //flush entries which have been inserted already to release locks hold by them
+ operatorNodePushable.flushPartialFrame();
+
+ //create WAIT log and wait until the WAIT log is flushed and notified by LogFlusher thread
+ logWait();
+
+ //acquire lock
+ lockManager.lock(datasetId, pkHash, LockMode.X, txnCtx);
+ }
+
+ } else {
+ //operatorNodePushable can be null when metadata node operation is executed
+ lockManager.lock(datasetId, pkHash, LockMode.X, txnCtx);
+ }
} catch (ACIDException e) {
throw new HyracksDataException(e);
}
}
+
+ private void logWait() throws ACIDException {
+ logManager.log(logRecord);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
index 6bfb6cd..0d65c16 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/LockThenSearchOperationCallbackFactory.java
@@ -25,6 +25,7 @@ import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.asterix.common.transactions.JobId;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
@@ -40,13 +41,13 @@ public class LockThenSearchOperationCallbackFactory extends AbstractOperationCal
}
@Override
- public ISearchOperationCallback createSearchOperationCallback(long resourceId, IHyracksTaskContext ctx)
- throws HyracksDataException {
+ public ISearchOperationCallback createSearchOperationCallback(long resourceId, IHyracksTaskContext ctx,
+ IOperatorNodePushable operatorNodePushable) throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
try {
ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
- return new LockThenSearchOperationCallback(datasetId, primaryKeyFields, txnSubsystem.getLockManager(),
- txnCtx);
+ return new LockThenSearchOperationCallback(datasetId, primaryKeyFields, txnSubsystem, txnCtx,
+ operatorNodePushable);
} catch (ACIDException e) {
throw new HyracksDataException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
index 9b14807..ba97287 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexInstantSearchOperationCallbackFactory.java
@@ -26,6 +26,7 @@ import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.asterix.common.transactions.JobId;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
@@ -41,7 +42,7 @@ public class PrimaryIndexInstantSearchOperationCallbackFactory extends AbstractO
}
@Override
- public ISearchOperationCallback createSearchOperationCallback(long resourceId, IHyracksTaskContext ctx)
+ public ISearchOperationCallback createSearchOperationCallback(long resourceId, IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable)
throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
try {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
index 780f294..4bde490 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
@@ -19,11 +19,14 @@
package org.apache.asterix.transaction.management.opcallbacks;
+import org.apache.asterix.common.dataflow.AsterixLSMInsertDeleteOperatorNodePushable;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.transactions.ILockManager;
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
+import org.apache.asterix.common.transactions.LogType;
import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
@@ -36,18 +39,57 @@ import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
public class PrimaryIndexModificationOperationCallback extends AbstractIndexModificationOperationCallback
implements IModificationOperationCallback {
+ private final AsterixLSMInsertDeleteOperatorNodePushable operatorNodePushable;
+
public PrimaryIndexModificationOperationCallback(int datasetId, int[] primaryKeyFields, ITransactionContext txnCtx,
ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId, int resourcePartition,
- byte resourceType, IndexOperation indexOp) {
+ byte resourceType, IndexOperation indexOp, IOperatorNodePushable operatorNodePushable) {
super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, resourceId, resourcePartition,
resourceType, indexOp);
+ this.operatorNodePushable = (AsterixLSMInsertDeleteOperatorNodePushable) operatorNodePushable;
}
@Override
public void before(ITupleReference tuple) throws HyracksDataException {
int pkHash = computePrimaryKeyHashValue(tuple, primaryKeyFields);
try {
- lockManager.lock(datasetId, pkHash, LockMode.X, txnCtx);
+ if (operatorNodePushable != null) {
+
+ /**********************************************************************************
+ * In order to achieve deadlock-free locking protocol during any write (insert/delete/upsert) operations,
+ * the following logic is implemented.
+ * See https://cwiki.apache.org/confluence/display/ASTERIXDB/Deadlock-Free+Locking+Protocol for more details.
+ * 1. for each entry in a frame
+ * 2. returnValue = tryLock() for an entry
+ * 3. if returnValue == false
+ * 3-1. flush all entries (which already acquired locks) to the next operator
+ * : this will make all those entries reach commit operator so that corresponding commit logs will be created.
+ * 3-2. create a WAIT log and wait until logFlusher thread will flush the WAIT log and gives notification
+ * : this notification guarantees that all locks acquired by this transactor (or all locks acquired for the entries)
+ * were released.
+ * 3-3. acquire lock using lock() instead of tryLock() for the failed entry
+ * : we know for sure this lock call will not cause deadlock since the transactor doesn't hold any other locks.
+ * 4. create an update log and insert the entry
+ * From the above logic, step 2 and 3 are implemented in this before() method.
+ **********************/
+
+ //release all locks held by this actor (which is a thread) by flushing partial frame.
+ boolean tryLockSucceed = lockManager.tryLock(datasetId, pkHash, LockMode.X, txnCtx);
+ if (!tryLockSucceed) {
+ //flush entries which have been inserted already to release locks hold by them
+ operatorNodePushable.flushPartialFrame();
+
+ //create WAIT log and wait until the WAIT log is flushed and notified by LogFlusher thread
+ logWait();
+
+ //acquire lock
+ lockManager.lock(datasetId, pkHash, LockMode.X, txnCtx);
+ }
+
+ } else {
+ //operatorNodePushable can be null when metadata node operation is executed
+ lockManager.lock(datasetId, pkHash, LockMode.X, txnCtx);
+ }
} catch (ACIDException e) {
throw new HyracksDataException(e);
}
@@ -62,4 +104,10 @@ public class PrimaryIndexModificationOperationCallback extends AbstractIndexModi
throw new HyracksDataException(e);
}
}
+
+ private void logWait() throws ACIDException {
+ logRecord.setLogType(LogType.WAIT);
+ logRecord.computeAndSetLogSize();
+ txnSubsystem.getLogManager().log(logRecord);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index db68b26..c406812 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -27,6 +27,7 @@ import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.asterix.common.transactions.JobId;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
@@ -51,7 +52,8 @@ public class PrimaryIndexModificationOperationCallbackFactory extends AbstractOp
@Override
public IModificationOperationCallback createModificationOperationCallback(String resourcePath, long resourceId,
- int resourcePartition, Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
+ int resourcePartition, Object resource, IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable)
+ throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
@@ -65,7 +67,7 @@ public class PrimaryIndexModificationOperationCallbackFactory extends AbstractOp
ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
IModificationOperationCallback modCallback = new PrimaryIndexModificationOperationCallback(datasetId,
primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resourceId,
- resourcePartition, resourceType, indexOp);
+ resourcePartition, resourceType, indexOp, operatorNodePushable);
txnCtx.registerIndexAndCallback(resourceId, index, (AbstractOperationCallback) modCallback, true);
return modCallback;
} catch (ACIDException e) {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
index b483674..1dd8368 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexSearchOperationCallbackFactory.java
@@ -26,6 +26,7 @@ import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.asterix.common.transactions.JobId;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
@@ -41,7 +42,7 @@ public class PrimaryIndexSearchOperationCallbackFactory extends AbstractOperatio
}
@Override
- public ISearchOperationCallback createSearchOperationCallback(long resourceId, IHyracksTaskContext ctx)
+ public ISearchOperationCallback createSearchOperationCallback(long resourceId, IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable)
throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
try {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index ef2b498..168da99 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -27,6 +27,7 @@ import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.asterix.common.transactions.JobId;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
@@ -48,7 +49,8 @@ public class SecondaryIndexModificationOperationCallbackFactory extends Abstract
@Override
public IModificationOperationCallback createModificationOperationCallback(String resourcePath, long resourceId,
- int resourcePartition, Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
+ int resourcePartition, Object resource, IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable)
+ throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
.getDatasetLifecycleManager();
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallbackFactory.java
index 4e1ee63..5dfdcdc 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexSearchOperationCallbackFactory.java
@@ -20,6 +20,7 @@
package org.apache.asterix.transaction.management.opcallbacks;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
@@ -29,7 +30,7 @@ public class SecondaryIndexSearchOperationCallbackFactory implements ISearchOper
private static final long serialVersionUID = 1L;
@Override
- public ISearchOperationCallback createSearchOperationCallback(long resourceId, IHyracksTaskContext ctx)
+ public ISearchOperationCallback createSearchOperationCallback(long resourceId, IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable)
throws HyracksDataException {
return new SecondaryIndexSearchOperationCallback();
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
index b08798c..8c91c1a 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
@@ -27,6 +27,7 @@ import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.asterix.common.transactions.JobId;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
@@ -49,7 +50,8 @@ public class TempDatasetPrimaryIndexModificationOperationCallbackFactory extends
@Override
public IModificationOperationCallback createModificationOperationCallback(String resourcePath, long resourceId,
- int resourcePartition, Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
+ int resourcePartition, Object resource, IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable)
+ throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
.getDatasetLifecycleManager();
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/23be9068/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
index 403d68d..3e11531 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
@@ -27,6 +27,7 @@ import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.asterix.common.transactions.JobId;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManager;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
@@ -49,7 +50,8 @@ public class TempDatasetSecondaryIndexModificationOperationCallbackFactory exten
@Override
public IModificationOperationCallback createModificationOperationCallback(String resourcePath, long resourceId,
- int resourcePartition, Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
+ int resourcePartition, Object resource, IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable)
+ throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
.getDatasetLifecycleManager();