You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2018/02/02 22:10:58 UTC
[2/2] asterixdb git commit: [ASTERIXDB-2231][STO] Separate primary op
tracker for each partition
[ASTERIXDB-2231][STO] Separate primary op tracker for each partition
- user model changes: no
- storage format changes: no.
- interface changes: yes.
Details:
- Separate primary index operation tracker for each partition, instead
of having a global one on each NC to achieve better scalability.
- As a coordinated change, separate component id generator for each
partition as well.
- Add partition to transaction context so that transaction operations
can operate on proper op tracker.
- Fixes [ASTERIXDB-2232] to calculate dataset partitions correctly.
Change-Id: I9eb3854d2343e45beeccb87b0d434e5f4efd69c9
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2263
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: abdullah alamoudi <ba...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/2f934e31
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/2f934e31
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/2f934e31
Branch: refs/heads/master
Commit: 2f934e312e841223aa0e4941e01f23b9e63fdc18
Parents: f94fdcc
Author: luochen01 <cl...@uci.edu>
Authored: Thu Feb 1 09:17:23 2018 -0800
Committer: abdullah alamoudi <ba...@gmail.com>
Committed: Thu Feb 1 23:35:43 2018 -0800
----------------------------------------------------------------------
.../asterix/app/nc/NCAppRuntimeContext.java | 22 +--
.../test/dataflow/ComponentRollbackTest.java | 14 +-
.../dataflow/MultiPartitionLSMIndexTest.java | 5 +-
.../TestLsmBtreeIoOpCallbackFactory.java | 3 +-
.../TestPrimaryIndexOperationTracker.java | 4 +-
...TestPrimaryIndexOperationTrackerFactory.java | 29 ++-
.../asterix/test/metadata/MetadataTxnTest.java | 7 +-
.../apache/asterix/test/txn/LogManagerTest.java | 2 +-
.../common/api/IDatasetLifecycleManager.java | 6 +-
.../common/api/INcApplicationContext.java | 2 +-
.../context/CorrelatedPrefixMergePolicy.java | 25 +--
.../asterix/common/context/DatasetInfo.java | 43 +++--
.../DatasetLSMComponentIdGeneratorFactory.java | 9 +-
.../common/context/DatasetLifecycleManager.java | 192 +++++++++++--------
.../asterix/common/context/DatasetResource.java | 45 +++--
.../context/PrimaryIndexOperationTracker.java | 12 +-
...tractLSMIndexIOOperationCallbackFactory.java | 13 +-
.../LSMBTreeIOOperationCallbackFactory.java | 3 +-
...TreeWithBuddyIOOperationCallbackFactory.java | 3 +-
...InvertedIndexIOOperationCallbackFactory.java | 3 +-
.../LSMRTreeIOOperationCallbackFactory.java | 3 +-
.../transactions/ITransactionContext.java | 8 +-
.../CorrelatedPrefixMergePolicyTest.java | 27 +--
.../apache/asterix/metadata/MetadataNode.java | 5 +-
.../metadata/bootstrap/MetadataBootstrap.java | 2 +-
.../asterix/metadata/entities/Dataset.java | 7 +-
.../job/listener/JobEventListenerFactory.java | 3 +-
...dexModificationOperationCallbackFactory.java | 2 +-
.../PrimaryIndexOperationTrackerFactory.java | 9 +-
...dexModificationOperationCallbackFactory.java | 2 +-
.../SecondaryIndexOperationTrackerFactory.java | 5 +-
.../UpsertOperationCallbackFactory.java | 2 +-
.../resource/DatasetLocalResourceFactory.java | 3 +-
.../management/service/logging/LogBuffer.java | 2 +-
.../transaction/AbstractTransactionContext.java | 2 +-
.../transaction/AtomicTransactionContext.java | 6 +-
.../EntityLevelTransactionContext.java | 44 +++--
.../dataflow/ExternalBTreeLocalResource.java | 4 +-
.../ExternalBTreeWithBuddyLocalResource.java | 4 +-
.../btree/dataflow/LSMBTreeLocalResource.java | 4 +-
.../am/lsm/btree/impls/ExternalBTree.java | 3 +-
.../lsm/btree/impls/ExternalBTreeWithBuddy.java | 4 +-
.../storage/am/lsm/btree/impls/LSMBTree.java | 2 +-
.../am/lsm/btree/utils/LSMBTreeUtil.java | 6 +-
.../api/ILSMComponentIdGeneratorFactory.java | 6 +-
.../api/ILSMIOOperationCallbackFactory.java | 8 +-
.../common/api/ILSMOperationTrackerFactory.java | 4 +-
.../am/lsm/common/impls/AbstractLSMIndex.java | 4 +-
.../impls/LSMComponentIdGeneratorFactory.java | 3 +-
.../impls/NoOpIOOperationCallbackFactory.java | 3 +-
.../impls/NoOpOperationTrackerFactory.java | 3 +-
.../ThreadCountingOperationTrackerFactory.java | 3 +-
.../dataflow/LSMInvertedIndexLocalResource.java | 6 +-
.../dataflow/ExternalRTreeLocalResource.java | 8 +-
.../rtree/dataflow/LSMRTreeLocalResource.java | 9 +-
.../LSMRTreeWithAntiMatterLocalResource.java | 4 +-
.../am/lsm/rtree/impls/AbstractLSMRTree.java | 2 +-
.../am/lsm/rtree/impls/ExternalRTree.java | 2 +-
.../storage/am/lsm/rtree/impls/LSMRTree.java | 2 +-
...MBTreeModificationOperationCallbackTest.java | 2 +-
.../LSMBTreeSearchOperationCallbackTest.java | 2 +-
.../am/lsm/btree/LSMBTreeUpdateInPlaceTest.java | 2 +-
.../btree/impl/TestLsmBtreeLocalResource.java | 4 +-
63 files changed, 408 insertions(+), 270 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index c554cbd..366438a 100644
--- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -109,15 +109,15 @@ public class NCAppRuntimeContext implements INcApplicationContext {
private ILSMMergePolicyFactory metadataMergePolicyFactory;
private final INCServiceContext ncServiceContext;
private final IResourceIdFactory resourceIdFactory;
- private CompilerProperties compilerProperties;
- private ExternalProperties externalProperties;
- private MetadataProperties metadataProperties;
- private StorageProperties storageProperties;
- private TransactionProperties txnProperties;
- private ActiveProperties activeProperties;
- private BuildProperties buildProperties;
- private ReplicationProperties replicationProperties;
- private MessagingProperties messagingProperties;
+ private final CompilerProperties compilerProperties;
+ private final ExternalProperties externalProperties;
+ private final MetadataProperties metadataProperties;
+ private final StorageProperties storageProperties;
+ private final TransactionProperties txnProperties;
+ private final ActiveProperties activeProperties;
+ private final BuildProperties buildProperties;
+ private final ReplicationProperties replicationProperties;
+ private final MessagingProperties messagingProperties;
private final NodeProperties nodeProperties;
private ExecutorService threadExecutor;
private IDatasetMemoryManager datasetMemoryManager;
@@ -373,8 +373,8 @@ public class NCAppRuntimeContext implements INcApplicationContext {
}
@Override
- public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID) {
- return datasetLifecycleManager.getOperationTracker(datasetID);
+ public ILSMOperationTracker getPrimaryOperationTracker(int datasetID, int partition) {
+ return datasetLifecycleManager.getOperationTracker(datasetID, partition);
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
index a46b029..c6232f5 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
@@ -144,7 +144,7 @@ public class ComponentRollbackTest {
Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
- dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
// rollback a memory component
lsmAccessor.deleteComponents(memoryComponentsPredicate);
@@ -153,7 +153,7 @@ public class ComponentRollbackTest {
// rollback the last disk component
lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
- dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
lsmAccessor.deleteComponents(pred);
@@ -201,7 +201,7 @@ public class ComponentRollbackTest {
Assert.assertTrue(memComponents.get(lsmBtree.getCurrentMemoryComponentIndex()).isModified());
StorageTestUtils.searchAndAssertCount(nc, PARTITION, StorageTestUtils.TOTAL_NUM_OF_RECORDS);
ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
- dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
// rollback a memory component
lsmAccessor.deleteComponents(memoryComponentsPredicate);
@@ -227,7 +227,7 @@ public class ComponentRollbackTest {
// rollback the last disk component
lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
- dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
lsmAccessor.deleteComponents(pred);
@@ -276,7 +276,7 @@ public class ComponentRollbackTest {
firstSearcher.waitUntilEntered();
// now that we enetered, we will rollback
ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
- dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
// rollback a memory component
lsmAccessor.deleteComponents(
@@ -298,7 +298,7 @@ public class ComponentRollbackTest {
lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
long lsn = AbstractLSMIOOperationCallback.getTreeIndexLSN(diskComponents.get(0).getMetadata());
- dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
DiskComponentLsnPredicate pred = new DiskComponentLsnPredicate(lsn);
lsmAccessor.deleteComponents(pred);
@@ -702,7 +702,7 @@ public class ComponentRollbackTest {
public void run() {
ILSMIndexAccessor lsmAccessor = lsmBtree.createAccessor(NoOpIndexAccessParameters.INSTANCE);
try {
- dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID).refresh();
+ dsLifecycleMgr.getComponentIdGenerator(StorageTestUtils.DATASET_ID, PARTITION).refresh();
((AbstractLSMIOOperationCallback) lsmBtree.getIOOperationCallback()).updateLastLSN(0);
lsmAccessor.deleteComponents(predicate);
} catch (HyracksDataException e) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
index 367d0b9..62705cc 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/MultiPartitionLSMIndexTest.java
@@ -251,7 +251,7 @@ public class MultiPartitionLSMIndexTest {
/**
* This test update partition 0, schedule flush and modify partition 1
- * Then ensure that in partition 1, primary and secondary have different component ids
+ * Then ensure that in partition 1, primary and secondary have the same component ids
*/
@Test
public void testAllocateWhileFlushIsScheduled() {
@@ -400,7 +400,8 @@ public class MultiPartitionLSMIndexTest {
AtomicBoolean arrivedAtSchduleFlush = new AtomicBoolean(false);
AtomicBoolean finishedSchduleFlush = new AtomicBoolean(false);
MutableBoolean proceedToScheduleFlush = new MutableBoolean(false);
- addOpTrackerCallback(primaryLsmBtrees[0], new ITestOpCallback<Void>() {
+ // keep track of the flush of partition 1 since partitions 0 and 1 are flushed seperately
+ addOpTrackerCallback(primaryLsmBtrees[1], new ITestOpCallback<Void>() {
@Override
public void before(Void t) {
synchronized (arrivedAtSchduleFlush) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
index 4bfc581..c69ffe5 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
@@ -22,7 +22,6 @@ import java.util.List;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
-import org.apache.asterix.common.storage.IIndexCheckpointManager;
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
@@ -52,7 +51,7 @@ public class TestLsmBtreeIoOpCallbackFactory extends LSMBTreeIOOperationCallback
}
@Override
- public synchronized ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) {
+ public synchronized ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws HyracksDataException {
completedFlushes = 0;
completedMerges = 0;
rollbackFlushes = 0;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java
index e376ff9..9a528d3 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java
@@ -33,9 +33,9 @@ public class TestPrimaryIndexOperationTracker extends PrimaryIndexOperationTrack
private final List<ITestOpCallback<Void>> callbacks = new ArrayList<>();
- public TestPrimaryIndexOperationTracker(int datasetID, ILogManager logManager, DatasetInfo dsInfo,
+ public TestPrimaryIndexOperationTracker(int datasetID, int partition, ILogManager logManager, DatasetInfo dsInfo,
ILSMComponentIdGenerator idGenerator) {
- super(datasetID, logManager, dsInfo, idGenerator);
+ super(datasetID, partition, logManager, dsInfo, idGenerator);
}
public void addCallback(ITestOpCallback<Void> callback) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
index 5d7a7c6..e6b34b8 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
@@ -20,19 +20,22 @@ package org.apache.asterix.test.dataflow;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
+import java.util.Map;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.context.DatasetLifecycleManager;
import org.apache.asterix.common.context.DatasetResource;
import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
+import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerFactory;
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import org.apache.hyracks.storage.common.IResource;
public class TestPrimaryIndexOperationTrackerFactory extends PrimaryIndexOperationTrackerFactory {
private static final long serialVersionUID = 1L;
- private int datasetId;
+ private final int datasetId;
public TestPrimaryIndexOperationTrackerFactory(int datasetId) {
super(datasetId);
@@ -40,17 +43,19 @@ public class TestPrimaryIndexOperationTrackerFactory extends PrimaryIndexOperati
}
@Override
- public ILSMOperationTracker getOperationTracker(INCServiceContext ctx) {
+ public ILSMOperationTracker getOperationTracker(INCServiceContext ctx, IResource resource) {
try {
INcApplicationContext appCtx = (INcApplicationContext) ctx.getApplicationContext();
DatasetLifecycleManager dslcManager = (DatasetLifecycleManager) appCtx.getDatasetLifecycleManager();
DatasetResource dsr = dslcManager.getDatasetLifecycle(datasetId);
- PrimaryIndexOperationTracker opTracker = dsr.getOpTracker();
+ int partition = StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath());
+ PrimaryIndexOperationTracker opTracker = dslcManager.getOperationTracker(datasetId, partition);
if (!(opTracker instanceof TestPrimaryIndexOperationTracker)) {
- Field opTrackerField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTracker");
- opTracker = new TestPrimaryIndexOperationTracker(datasetId,
- appCtx.getTransactionSubsystem().getLogManager(), dsr.getDatasetInfo(), dsr.getIdGenerator());
- setFinal(opTrackerField, dsr, opTracker);
+ Field opTrackersField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers");
+ opTracker = new TestPrimaryIndexOperationTracker(datasetId, partition,
+ appCtx.getTransactionSubsystem().getLogManager(), dsr.getDatasetInfo(),
+ dslcManager.getComponentIdGenerator(datasetId, partition));
+ replaceMapEntry(opTrackersField, dsr, partition, opTracker);
}
return opTracker;
} catch (Exception e) {
@@ -65,4 +70,14 @@ public class TestPrimaryIndexOperationTrackerFactory extends PrimaryIndexOperati
modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
field.set(obj, newValue);
}
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ static void replaceMapEntry(Field field, Object obj, Object key, Object value)
+ throws Exception, IllegalAccessException {
+ field.setAccessible(true);
+ Field modifiersField = Field.class.getDeclaredField("modifiers");
+ modifiersField.setAccessible(true);
+ Map map = (Map) field.get(obj);
+ map.put(key, value);
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
index a10c234..70e5f6e 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/metadata/MetadataTxnTest.java
@@ -42,6 +42,7 @@ import org.apache.asterix.metadata.entities.NodeGroup;
import org.apache.asterix.metadata.utils.DatasetUtil;
import org.apache.asterix.test.common.TestExecutor;
import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
import org.junit.After;
import org.junit.Assert;
@@ -203,8 +204,10 @@ public class MetadataTxnTest {
((INcApplicationContext) integrationUtil.ncs[0].getApplicationContext()).getDatasetLifecycleManager();
int maxMetadatasetId = 14;
for (int i = 1; i <= maxMetadatasetId; i++) {
- if (datasetLifecycleManager.getIndex(i, i) != null) {
- final PrimaryIndexOperationTracker opTracker = datasetLifecycleManager.getOperationTracker(i);
+ ILSMIndex index = (ILSMIndex) datasetLifecycleManager.getIndex(i, i);
+ if (index != null) {
+ final PrimaryIndexOperationTracker opTracker =
+ (PrimaryIndexOperationTracker) index.getOperationTracker();
Assert.assertEquals(0, opTracker.getNumActiveOperations());
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
index a1978eb..6a70a29 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/txn/LogManagerTest.java
@@ -170,7 +170,7 @@ public class LogManagerTest {
final TransactionOptions options = new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL);
final ITransactionManager transactionManager = ncAppCtx.getTransactionSubsystem().getTransactionManager();
final ITransactionContext txnCtx = transactionManager.beginTransaction(txnId, options);
- txnCtx.register(resourceId, index, NoOpOperationCallback.INSTANCE, true);
+ txnCtx.register(resourceId, 0, index, NoOpOperationCallback.INSTANCE, true);
return txnCtx;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
index 41c5ade..4441c6e 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IDatasetLifecycleManager.java
@@ -75,17 +75,19 @@ public interface IDatasetLifecycleManager extends IResourceLifecycleManager<IInd
* creates (if necessary) and returns the primary index operation tracker of a dataset.
*
* @param datasetId
+ * @param partition
* @return
*/
- PrimaryIndexOperationTracker getOperationTracker(int datasetId);
+ PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition);
/**
* creates (if necessary) and returns the component Id generator of a dataset.
*
* @param datasetId
+ * @param partition
* @return
*/
- ILSMComponentIdGenerator getComponentIdGenerator(int datasetId);
+ ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition);
/**
* creates (if necessary) and returns the dataset virtual buffer caches.
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 8a83c7b..fffc170 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -66,7 +66,7 @@ public interface INcApplicationContext extends IApplicationContext {
IResourceIdFactory getResourceIdFactory();
- ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID);
+ ILSMOperationTracker getPrimaryOperationTracker(int datasetID, int partition);
void initialize(boolean initialRun) throws IOException, ACIDException, AlgebricksException;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
index e5fc998..41461ec 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/CorrelatedPrefixMergePolicy.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
-import java.util.stream.Collectors;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.commons.lang3.tuple.Pair;
@@ -92,10 +91,10 @@ public class CorrelatedPrefixMergePolicy extends PrefixMergePolicy {
ILSMComponent leftComponent = immutableComponents.get(mergeableIndexes.getLeft());
ILSMComponent rightComponent = immutableComponents.get(mergeableIndexes.getRight());
ILSMComponentId targetId = LSMComponentIdUtils.union(leftComponent.getId(), rightComponent.getId());
- Set<IndexInfo> indexInfos = datasetLifecycleManager.getDatasetInfo(datasetId).getDatsetIndexInfos();
- int partition = getIndexPartition(index, indexInfos);
- triggerScheduledMerge(targetId,
- indexInfos.stream().filter(info -> info.getPartition() == partition).collect(Collectors.toSet()));
+ int partition = ((PrimaryIndexOperationTracker) index.getOperationTracker()).getPartition();
+ Set<ILSMIndex> indexes =
+ datasetLifecycleManager.getDatasetInfo(datasetId).getDatasetPartitionOpenIndexes(partition);
+ triggerScheduledMerge(targetId, indexes);
return true;
}
@@ -107,11 +106,8 @@ public class CorrelatedPrefixMergePolicy extends PrefixMergePolicy {
* @param indexInfos
* @throws HyracksDataException
*/
- private void triggerScheduledMerge(ILSMComponentId targetId, Set<IndexInfo> indexInfos)
- throws HyracksDataException {
- for (IndexInfo info : indexInfos) {
- ILSMIndex lsmIndex = info.getIndex();
-
+ private void triggerScheduledMerge(ILSMComponentId targetId, Set<ILSMIndex> indexes) throws HyracksDataException {
+ for (ILSMIndex lsmIndex : indexes) {
List<ILSMDiskComponent> immutableComponents = new ArrayList<>(lsmIndex.getDiskComponents());
if (isMergeOngoing(immutableComponents)) {
continue;
@@ -132,13 +128,4 @@ public class CorrelatedPrefixMergePolicy extends PrefixMergePolicy {
accessor.scheduleMerge(lsmIndex.getIOOperationCallback(), mergableComponents);
}
}
-
- private int getIndexPartition(ILSMIndex index, Set<IndexInfo> indexInfos) {
- for (IndexInfo info : indexInfos) {
- if (info.getIndex() == index) {
- return info.getPartition();
- }
- }
- return -1;
- }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
index 9d63818..44baf77 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetInfo.java
@@ -30,6 +30,9 @@ import org.apache.logging.log4j.Logger;
public class DatasetInfo extends Info implements Comparable<DatasetInfo> {
private static final Logger LOGGER = LogManager.getLogger();
+ // partition -> index
+ private final Map<Integer, Set<IndexInfo>> partitionIndexes;
+ // resourceID -> index
private final Map<Long, IndexInfo> indexes;
private final int datasetID;
private int numActiveIOOps;
@@ -40,6 +43,7 @@ public class DatasetInfo extends Info implements Comparable<DatasetInfo> {
private boolean durable;
public DatasetInfo(int datasetID) {
+ this.partitionIndexes = new HashMap<>();
this.indexes = new HashMap<>();
this.setLastAccess(-1);
this.datasetID = datasetID;
@@ -69,26 +73,17 @@ public class DatasetInfo extends Info implements Comparable<DatasetInfo> {
notifyAll();
}
- public synchronized Set<ILSMIndex> getDatasetIndexes() {
- Set<ILSMIndex> datasetIndexes = new HashSet<>();
- for (IndexInfo iInfo : getIndexes().values()) {
- if (iInfo.isOpen()) {
- datasetIndexes.add(iInfo.getIndex());
- }
- }
-
- return datasetIndexes;
- }
-
- public synchronized Set<IndexInfo> getDatsetIndexInfos() {
- Set<IndexInfo> infos = new HashSet<>();
- for (IndexInfo iInfo : getIndexes().values()) {
- if (iInfo.isOpen()) {
- infos.add(iInfo);
+ public synchronized Set<ILSMIndex> getDatasetPartitionOpenIndexes(int partition) {
+ Set<ILSMIndex> indexSet = new HashSet<>();
+ Set<IndexInfo> partitionIndexInfos = this.partitionIndexes.get(partition);
+ if (partitionIndexInfos != null) {
+ for (IndexInfo iInfo : partitionIndexInfos) {
+ if (iInfo.isOpen()) {
+ indexSet.add(iInfo.getIndex());
+ }
}
}
-
- return infos;
+ return indexSet;
}
@Override
@@ -160,6 +155,18 @@ public class DatasetInfo extends Info implements Comparable<DatasetInfo> {
return indexes;
}
+ public synchronized void addIndex(long resourceID, IndexInfo indexInfo) {
+ indexes.put(resourceID, indexInfo);
+ partitionIndexes.computeIfAbsent(indexInfo.getPartition(), partition -> new HashSet<>()).add(indexInfo);
+ }
+
+ public synchronized void removeIndex(long resourceID) {
+ IndexInfo info = indexes.remove(resourceID);
+ if (info != null) {
+ partitionIndexes.get(info.getPartition()).remove(info);
+ }
+ }
+
public boolean isRegistered() {
return isRegistered;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java
index 7b8397c..83e3144 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLSMComponentIdGeneratorFactory.java
@@ -21,9 +21,12 @@ package org.apache.asterix.common.context;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
+import org.apache.hyracks.storage.common.IResource;
/**
* This factory implementation is used by AsterixDB layer so that indexes of a dataset (/partition)
@@ -41,10 +44,12 @@ public class DatasetLSMComponentIdGeneratorFactory implements ILSMComponentIdGen
}
@Override
- public ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx) {
+ public ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx, IResource resource)
+ throws HyracksDataException {
IDatasetLifecycleManager dslcManager =
((INcApplicationContext) serviceCtx.getApplicationContext()).getDatasetLifecycleManager();
- return dslcManager.getComponentIdGenerator(datasetId);
+ int partition = StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath());
+ return dslcManager.getComponentIdGenerator(datasetId, partition);
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 3a70515..1a61b8f 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -21,6 +21,7 @@ package org.apache.asterix.common.context;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -139,7 +140,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST);
}
- PrimaryIndexOperationTracker opTracker = dsr.getOpTracker();
+ PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(iInfo.getPartition());
if (iInfo.getReferenceCount() != 0 || (opTracker != null && opTracker.getNumActiveOperations() != 0)) {
if (LOGGER.isErrorEnabled()) {
final String logMsg = String.format(
@@ -155,7 +156,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
DatasetInfo dsInfo = dsr.getDatasetInfo();
dsInfo.waitForIO();
closeIndex(iInfo);
- dsInfo.getIndexes().remove(resourceID);
+ dsInfo.removeIndex(resourceID);
if (dsInfo.getReferenceCount() == 0 && dsInfo.isOpen() && dsInfo.getIndexes().isEmpty()
&& !dsInfo.isExternal()) {
removeDatasetFromCache(dsInfo.getDatasetID());
@@ -203,10 +204,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
List<DatasetResource> datasetsResources = new ArrayList<>(datasets.values());
Collections.sort(datasetsResources);
for (DatasetResource dsr : datasetsResources) {
- PrimaryIndexOperationTracker opTracker = dsr.getOpTracker();
- if (opTracker != null && opTracker.getNumActiveOperations() == 0
- && dsr.getDatasetInfo().getReferenceCount() == 0 && dsr.getDatasetInfo().isOpen()
- && !dsr.isMetadataDataset()) {
+ if (isCandidateDatasetForEviction(dsr)) {
closeDataset(dsr);
LOGGER.info(() -> "Evicted Dataset" + dsr.getDatasetID());
return true;
@@ -215,14 +213,18 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
return false;
}
- private static void flushAndWaitForIO(DatasetInfo dsInfo, IndexInfo iInfo) throws HyracksDataException {
- if (iInfo.isOpen()) {
- ILSMIndexAccessor accessor = iInfo.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE);
- accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback());
+ private boolean isCandidateDatasetForEviction(DatasetResource dsr) {
+ for (PrimaryIndexOperationTracker opTracker : dsr.getOpTrackers()) {
+ if (opTracker.getNumActiveOperations() != 0) {
+ return false;
+ }
+ }
+ if (dsr.getDatasetInfo().getReferenceCount() != 0 || !dsr.getDatasetInfo().isOpen()
+ || dsr.isMetadataDataset()) {
+ return false;
}
- // Wait for the above flush op.
- dsInfo.waitForIO();
+ return true;
}
public DatasetResource getDatasetLifecycle(int did) {
@@ -234,12 +236,9 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
dsr = datasets.get(did);
if (dsr == null) {
DatasetInfo dsInfo = new DatasetInfo(did);
- ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator();
- PrimaryIndexOperationTracker opTracker =
- new PrimaryIndexOperationTracker(did, logManager, dsInfo, idGenerator);
DatasetVirtualBufferCaches vbcs = new DatasetVirtualBufferCaches(did, storageProperties,
memoryManager.getNumPages(did), numPartitions);
- dsr = new DatasetResource(dsInfo, opTracker, vbcs, idGenerator);
+ dsr = new DatasetResource(dsInfo, vbcs);
datasets.put(did, dsr);
}
return dsr;
@@ -318,13 +317,33 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
}
@Override
- public PrimaryIndexOperationTracker getOperationTracker(int datasetId) {
- return datasets.get(datasetId).getOpTracker();
+ public synchronized PrimaryIndexOperationTracker getOperationTracker(int datasetId, int partition) {
+ DatasetResource dataset = datasets.get(datasetId);
+ PrimaryIndexOperationTracker opTracker = dataset.getOpTracker(partition);
+ if (opTracker == null) {
+ populateOpTrackerAndIdGenerator(dataset, partition);
+ opTracker = dataset.getOpTracker(partition);
+ }
+ return opTracker;
}
@Override
- public ILSMComponentIdGenerator getComponentIdGenerator(int datasetId) {
- return datasets.get(datasetId).getIdGenerator();
+ public synchronized ILSMComponentIdGenerator getComponentIdGenerator(int datasetId, int partition) {
+ DatasetResource dataset = datasets.get(datasetId);
+ ILSMComponentIdGenerator generator = dataset.getComponentIdGenerator(partition);
+ if (generator == null) {
+ populateOpTrackerAndIdGenerator(dataset, partition);
+ generator = dataset.getComponentIdGenerator(partition);
+ }
+ return generator;
+ }
+
+ private void populateOpTrackerAndIdGenerator(DatasetResource dataset, int partition) {
+ ILSMComponentIdGenerator idGenerator = new LSMComponentIdGenerator();
+ PrimaryIndexOperationTracker opTracker = new PrimaryIndexOperationTracker(dataset.getDatasetID(), partition,
+ logManager, dataset.getDatasetInfo(), idGenerator);
+ dataset.setPrimaryIndexOperationTracker(partition, opTracker);
+ dataset.setIdGenerator(partition, idGenerator);
}
private void validateDatasetLifecycleManagerState() throws HyracksDataException {
@@ -357,31 +376,40 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
public synchronized void scheduleAsyncFlushForLaggingDatasets(long targetLSN) throws HyracksDataException {
//schedule flush for datasets with min LSN (Log Serial Number) < targetLSN
for (DatasetResource dsr : datasets.values()) {
- PrimaryIndexOperationTracker opTracker = dsr.getOpTracker();
- synchronized (opTracker) {
- for (IndexInfo iInfo : dsr.getIndexes().values()) {
- AbstractLSMIOOperationCallback ioCallback =
- (AbstractLSMIOOperationCallback) iInfo.getIndex().getIOOperationCallback();
- if (!(iInfo.getIndex().isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()
- || opTracker.isFlushLogCreated() || opTracker.isFlushOnExit())) {
- long firstLSN = ioCallback.getFirstLSN();
- if (firstLSN < targetLSN) {
- LOGGER.info("Checkpoint flush dataset {}", dsr.getDatasetID());
- opTracker.setFlushOnExit(true);
- if (opTracker.getNumActiveOperations() == 0) {
- // No Modify operations currently, we need to trigger the flush and we can do so safely
- opTracker.flushIfRequested();
- }
- break;
- }
+ for (PrimaryIndexOperationTracker opTracker : dsr.getOpTrackers()) {
+ // check all partitions
+ synchronized (opTracker) {
+ scheduleAsyncFlushForLaggingDatasetPartition(dsr, opTracker, targetLSN);
+ }
+ }
+ }
+ }
+
+ private void scheduleAsyncFlushForLaggingDatasetPartition(DatasetResource dsr,
+ PrimaryIndexOperationTracker opTracker, long targetLSN) throws HyracksDataException {
+ int partition = opTracker.getPartition();
+ for (ILSMIndex lsmIndex : dsr.getDatasetInfo().getDatasetPartitionOpenIndexes(partition)) {
+ AbstractLSMIOOperationCallback ioCallback =
+ (AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback();
+ if (!(lsmIndex.isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()
+ || opTracker.isFlushLogCreated() || opTracker.isFlushOnExit())) {
+ long firstLSN = ioCallback.getFirstLSN();
+ if (firstLSN < targetLSN) {
+ LOGGER.info("Checkpoint flush dataset {} partition {}", dsr.getDatasetID(), partition);
+ opTracker.setFlushOnExit(true);
+ if (opTracker.getNumActiveOperations() == 0) {
+ // No Modify operations currently, we need to trigger the flush and we can do so safely
+ opTracker.flushIfRequested();
}
+ break;
}
}
}
}
/*
- * This method can only be called asynchronously safely if we're sure no modify operation will take place until the flush is scheduled
+ * This method can only be called asynchronously safely if we're sure no modify operation
+ * will take place until the flush is scheduled
*/
private void flushDatasetOpenIndexes(DatasetResource dsr, boolean asyncFlush) throws HyracksDataException {
DatasetInfo dsInfo = dsr.getDatasetInfo();
@@ -389,53 +417,61 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC
// no memory components for external dataset
return;
}
- PrimaryIndexOperationTracker primaryOpTracker = dsr.getOpTracker();
- if (primaryOpTracker.getNumActiveOperations() > 0) {
- throw new IllegalStateException(
- "flushDatasetOpenIndexes is called on a dataset with currently active operations");
- }
-
- ILSMComponentIdGenerator idGenerator = getComponentIdGenerator(dsInfo.getDatasetID());
- idGenerator.refresh();
+ for (PrimaryIndexOperationTracker primaryOpTracker : dsr.getOpTrackers()) {
+ // flush each partition one by one
+ if (primaryOpTracker.getNumActiveOperations() > 0) {
+ throw new IllegalStateException(
+ "flushDatasetOpenIndexes is called on a dataset with currently active operations");
+ }
+ int partition = primaryOpTracker.getPartition();
+ Collection<ILSMIndex> indexes = dsInfo.getDatasetPartitionOpenIndexes(partition);
+ ILSMComponentIdGenerator idGenerator = getComponentIdGenerator(dsInfo.getDatasetID(), partition);
+ idGenerator.refresh();
+
+ if (dsInfo.isDurable()) {
+ synchronized (logRecord) {
+ TransactionUtil.formFlushLogRecord(logRecord, dsInfo.getDatasetID(), null);
+ try {
+ logManager.log(logRecord);
+ } catch (ACIDException e) {
+ throw new HyracksDataException("could not write flush log while closing dataset", e);
+ }
- if (dsInfo.isDurable()) {
- synchronized (logRecord) {
- TransactionUtil.formFlushLogRecord(logRecord, dsInfo.getDatasetID(), null);
- try {
- logManager.log(logRecord);
- } catch (ACIDException e) {
- throw new HyracksDataException("could not write flush log while closing dataset", e);
+ try {
+ //notification will come from LogBuffer class (notifyFlushTerminator)
+ logRecord.wait();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw HyracksDataException.create(e);
+ }
}
+ }
+ for (ILSMIndex index : indexes) {
+ //update resource lsn
+ AbstractLSMIOOperationCallback ioOpCallback =
+ (AbstractLSMIOOperationCallback) index.getIOOperationCallback();
+ ioOpCallback.updateLastLSN(logRecord.getLSN());
+ }
- try {
- //notification will come from LogPage class (notifyFlushTerminator)
- logRecord.wait();
- } catch (InterruptedException e) {
- throw new HyracksDataException(e);
+ if (asyncFlush) {
+ for (ILSMIndex index : indexes) {
+ ILSMIndexAccessor accessor = index.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ accessor.scheduleFlush(index.getIOOperationCallback());
+ }
+ } else {
+ for (ILSMIndex index : indexes) {
+ // TODO: This is not efficient since we flush the indexes sequentially.
+ // Think of a way to allow submitting the flush requests concurrently.
+ // We don't do them concurrently because this may lead to a deadlock scenario
+ // between the DatasetLifeCycleManager and the PrimaryIndexOperationTracker.
+ ILSMIndexAccessor accessor = index.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ accessor.scheduleFlush(index.getIOOperationCallback());
+ // Wait for the above flush op.
+ dsInfo.waitForIO();
}
}
}
- for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
- //update resource lsn
- AbstractLSMIOOperationCallback ioOpCallback =
- (AbstractLSMIOOperationCallback) iInfo.getIndex().getIOOperationCallback();
- ioOpCallback.updateLastLSN(logRecord.getLSN());
- }
-
- if (asyncFlush) {
- for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
- ILSMIndexAccessor accessor = iInfo.getIndex().createAccessor(NoOpIndexAccessParameters.INSTANCE);
- accessor.scheduleFlush(iInfo.getIndex().getIOOperationCallback());
- }
- } else {
- for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
- // TODO: This is not efficient since we flush the indexes sequentially.
- // Think of a way to allow submitting the flush requests concurrently. We don't do them concurrently because this
- // may lead to a deadlock scenario between the DatasetLifeCycleManager and the PrimaryIndexOperationTracker.
- flushAndWaitForIO(dsInfo, iInfo);
- }
- }
}
private void closeDataset(DatasetResource dsr) throws HyracksDataException {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index c02de7e..8dcae23 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -18,6 +18,8 @@
*/
package org.apache.asterix.common.context;
+import java.util.Collection;
+import java.util.HashMap;
import java.util.Map;
import org.apache.asterix.common.dataflow.DatasetLocalResource;
@@ -41,17 +43,16 @@ import org.apache.hyracks.storage.common.LocalResource;
*/
public class DatasetResource implements Comparable<DatasetResource> {
private final DatasetInfo datasetInfo;
- private final PrimaryIndexOperationTracker datasetPrimaryOpTracker;
private final DatasetVirtualBufferCaches datasetVirtualBufferCaches;
- private final ILSMComponentIdGenerator datasetComponentIdGenerator;
- public DatasetResource(DatasetInfo datasetInfo, PrimaryIndexOperationTracker datasetPrimaryOpTracker,
- DatasetVirtualBufferCaches datasetVirtualBufferCaches,
- ILSMComponentIdGenerator datasetComponentIdGenerator) {
+ private final Map<Integer, PrimaryIndexOperationTracker> datasetPrimaryOpTrackers;
+ private final Map<Integer, ILSMComponentIdGenerator> datasetComponentIdGenerators;
+
+ public DatasetResource(DatasetInfo datasetInfo, DatasetVirtualBufferCaches datasetVirtualBufferCaches) {
this.datasetInfo = datasetInfo;
- this.datasetPrimaryOpTracker = datasetPrimaryOpTracker;
this.datasetVirtualBufferCaches = datasetVirtualBufferCaches;
- this.datasetComponentIdGenerator = datasetComponentIdGenerator;
+ this.datasetPrimaryOpTrackers = new HashMap<>();
+ this.datasetComponentIdGenerators = new HashMap<>();
}
public boolean isRegistered() {
@@ -108,7 +109,8 @@ public class DatasetResource implements Comparable<DatasetResource> {
if (index == null) {
throw new HyracksDataException("Attempt to register a null index");
}
- datasetInfo.getIndexes().put(resourceID, new IndexInfo(index, datasetInfo.getDatasetID(), resource,
+
+ datasetInfo.addIndex(resourceID, new IndexInfo(index, datasetInfo.getDatasetID(), resource,
((DatasetLocalResource) resource.getResource()).getPartition()));
}
@@ -116,12 +118,31 @@ public class DatasetResource implements Comparable<DatasetResource> {
return datasetInfo;
}
- public PrimaryIndexOperationTracker getOpTracker() {
- return datasetPrimaryOpTracker;
+ public PrimaryIndexOperationTracker getOpTracker(int partition) {
+ return datasetPrimaryOpTrackers.get(partition);
+ }
+
+ public Collection<PrimaryIndexOperationTracker> getOpTrackers() {
+ return datasetPrimaryOpTrackers.values();
+ }
+
+ public ILSMComponentIdGenerator getComponentIdGenerator(int partition) {
+ return datasetComponentIdGenerators.get(partition);
}
- public ILSMComponentIdGenerator getIdGenerator() {
- return datasetComponentIdGenerator;
+ public void setPrimaryIndexOperationTracker(int partition, PrimaryIndexOperationTracker opTracker) {
+ if (datasetPrimaryOpTrackers.containsKey(partition)) {
+ throw new IllegalStateException(
+ "PrimaryIndexOperationTracker has already been set for partition " + partition);
+ }
+ datasetPrimaryOpTrackers.put(partition, opTracker);
+ }
+
+ public void setIdGenerator(int partition, ILSMComponentIdGenerator idGenerator) {
+ if (datasetComponentIdGenerators.containsKey(partition)) {
+ throw new IllegalStateException("LSMComponentIdGenerator has already been set for partition " + partition);
+ }
+ datasetComponentIdGenerators.put(partition, idGenerator);
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index 5170310..1a76b66 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -43,6 +43,7 @@ import org.apache.hyracks.storage.common.ISearchOperationCallback;
public class PrimaryIndexOperationTracker extends BaseOperationTracker {
+ private final int partition;
// Number of active operations on an ILSMIndex instance.
private final AtomicInteger numActiveOperations;
private final ILogManager logManager;
@@ -50,9 +51,10 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker {
private boolean flushOnExit = false;
private boolean flushLogCreated = false;
- public PrimaryIndexOperationTracker(int datasetID, ILogManager logManager, DatasetInfo dsInfo,
+ public PrimaryIndexOperationTracker(int datasetID, int partition, ILogManager logManager, DatasetInfo dsInfo,
ILSMComponentIdGenerator idGenerator) {
super(datasetID, dsInfo);
+ this.partition = partition;
this.logManager = logManager;
this.numActiveOperations = new AtomicInteger();
this.idGenerator = idGenerator;
@@ -100,7 +102,7 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker {
// or if there is a flush scheduled by the checkpoint (flushOnExit), then schedule it
boolean needsFlush = false;
- Set<ILSMIndex> indexes = dsInfo.getDatasetIndexes();
+ Set<ILSMIndex> indexes = dsInfo.getDatasetPartitionOpenIndexes(partition);
if (!flushOnExit) {
for (ILSMIndex lsmIndex : indexes) {
@@ -146,7 +148,7 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker {
//This method is called sequentially by LogPage.notifyFlushTerminator in the sequence flushes were scheduled.
public synchronized void triggerScheduleFlush(LogRecord logRecord) throws HyracksDataException {
idGenerator.refresh();
- for (ILSMIndex lsmIndex : dsInfo.getDatasetIndexes()) {
+ for (ILSMIndex lsmIndex : dsInfo.getDatasetPartitionOpenIndexes(partition)) {
//get resource
ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE);
//update resource lsn
@@ -199,4 +201,8 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker {
return flushLogCreated;
}
+ public int getPartition() {
+ return partition;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java
index ed56ab1..5b9883c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java
@@ -24,11 +24,13 @@ import java.io.ObjectStreamException;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
+import org.apache.hyracks.storage.common.IResource;
public abstract class AbstractLSMIndexIOOperationCallbackFactory implements ILSMIOOperationCallbackFactory {
@@ -38,17 +40,20 @@ public abstract class AbstractLSMIndexIOOperationCallbackFactory implements ILSM
protected transient INCServiceContext ncCtx;
+ protected transient IResource resource;
+
public AbstractLSMIndexIOOperationCallbackFactory(ILSMComponentIdGeneratorFactory idGeneratorFactory) {
this.idGeneratorFactory = idGeneratorFactory;
}
@Override
- public void initialize(INCServiceContext ncCtx) {
+ public void initialize(INCServiceContext ncCtx, IResource resource) {
this.ncCtx = ncCtx;
+ this.resource = resource;
}
- protected ILSMComponentIdGenerator getComponentIdGenerator() {
- return idGeneratorFactory.getComponentIdGenerator(ncCtx);
+ protected ILSMComponentIdGenerator getComponentIdGenerator() throws HyracksDataException {
+ return idGeneratorFactory.getComponentIdGenerator(ncCtx, resource);
}
protected IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider() {
@@ -60,7 +65,7 @@ public abstract class AbstractLSMIndexIOOperationCallbackFactory implements ILSM
private static final long serialVersionUID = 1L;
@Override
- public ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx) {
+ public ILSMComponentIdGenerator getComponentIdGenerator(INCServiceContext serviceCtx, IResource resource) {
// used for backward compatibility
// if idGeneratorFactory is not set for legacy lsm indexes, we return a default
// component id generator which always generates the missing component id.
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
index 95245cb..97badb2 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
@@ -19,6 +19,7 @@
package org.apache.asterix.common.ioopcallbacks;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -32,7 +33,7 @@ public class LSMBTreeIOOperationCallbackFactory extends AbstractLSMIndexIOOperat
}
@Override
- public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) {
+ public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws HyracksDataException {
return new LSMBTreeIOOperationCallback(index, getComponentIdGenerator(), getIndexCheckpointManagerProvider());
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java
index 6c75ed6..9b32345 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.common.ioopcallbacks;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -31,7 +32,7 @@ public class LSMBTreeWithBuddyIOOperationCallbackFactory extends AbstractLSMInde
}
@Override
- public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) {
+ public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws HyracksDataException {
return new LSMBTreeWithBuddyIOOperationCallback(index, getComponentIdGenerator(),
getIndexCheckpointManagerProvider());
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
index fb73d19..766ef95 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
@@ -19,6 +19,7 @@
package org.apache.asterix.common.ioopcallbacks;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -32,7 +33,7 @@ public class LSMInvertedIndexIOOperationCallbackFactory extends AbstractLSMIndex
}
@Override
- public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) {
+ public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws HyracksDataException {
return new LSMInvertedIndexIOOperationCallback(index, getComponentIdGenerator(),
getIndexCheckpointManagerProvider());
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
index 94be0bb..3a0afa8 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
@@ -19,6 +19,7 @@
package org.apache.asterix.common.ioopcallbacks;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGeneratorFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
@@ -32,7 +33,7 @@ public class LSMRTreeIOOperationCallbackFactory extends AbstractLSMIndexIOOperat
}
@Override
- public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) {
+ public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws HyracksDataException {
return new LSMRTreeIOOperationCallback(index, getComponentIdGenerator(), getIndexCheckpointManagerProvider());
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
index c4a2d03..a3d5bc5 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionContext.java
@@ -38,11 +38,13 @@ public interface ITransactionContext {
* transaction.
*
* @param resourceId
+ * @param partition
* @param index
* @param callback
* @param primaryIndex
*/
- void register(long resourceId, ILSMIndex index, IModificationOperationCallback callback, boolean primaryIndex);
+ void register(long resourceId, int partition, ILSMIndex index, IModificationOperationCallback callback,
+ boolean primaryIndex);
/**
* Gets the unique transaction id.
@@ -135,8 +137,10 @@ public interface ITransactionContext {
* Called to notify the transaction that an entity commit
* log belonging to this transaction has been flushed to
* disk.
+ *
+ * @param partition
*/
- void notifyEntityCommitted();
+ void notifyEntityCommitted(int partition);
/**
* Called after an operation is performed on index
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
index 3aa7b17..f9f742a 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
@@ -22,15 +22,14 @@ package org.apache.asterix.test.context;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.context.CorrelatedPrefixMergePolicy;
import org.apache.asterix.common.context.DatasetInfo;
import org.apache.asterix.common.context.IndexInfo;
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
@@ -60,6 +59,8 @@ public class CorrelatedPrefixMergePolicyTest extends TestCase {
private final int DATASET_ID = 1;
+ private long nextResourceId = 0;
+
@Test
public void testBasic() {
try {
@@ -183,19 +184,15 @@ public class CorrelatedPrefixMergePolicyTest extends TestCase {
}
}
- private ILSMMergePolicy mockMergePolicy(IndexInfo... indexes) {
+ private ILSMMergePolicy mockMergePolicy(IndexInfo... indexInfos) {
Map<String, String> properties = new HashMap<>();
properties.put("max-tolerance-component-count", String.valueOf(MAX_COMPONENT_COUNT));
properties.put("max-mergable-component-size", String.valueOf(MAX_COMPONENT_SIZE));
- Set<IndexInfo> indexInfos = new HashSet<>();
- for (IndexInfo info : indexes) {
- indexInfos.add(info);
+ DatasetInfo dsInfo = new DatasetInfo(DATASET_ID);
+ for (IndexInfo index : indexInfos) {
+ dsInfo.addIndex(index.getResourceId(), index);
}
-
- DatasetInfo dsInfo = Mockito.mock(DatasetInfo.class);
- Mockito.when(dsInfo.getDatsetIndexInfos()).thenReturn(indexInfos);
-
IDatasetLifecycleManager manager = Mockito.mock(IDatasetLifecycleManager.class);
Mockito.when(manager.getDatasetInfo(DATASET_ID)).thenReturn(dsInfo);
@@ -238,8 +235,16 @@ public class CorrelatedPrefixMergePolicyTest extends TestCase {
Mockito.when(index.createAccessor(Mockito.any(IIndexAccessParameters.class))).thenReturn(accessor);
Mockito.when(index.isPrimaryIndex()).thenReturn(isPrimary);
+ if (isPrimary) {
+ PrimaryIndexOperationTracker opTracker = Mockito.mock(PrimaryIndexOperationTracker.class);
+ Mockito.when(opTracker.getPartition()).thenReturn(partition);
+ Mockito.when(index.getOperationTracker()).thenReturn(opTracker);
+ }
final LocalResource localResource = Mockito.mock(LocalResource.class);
- return new IndexInfo(index, DATASET_ID, localResource, partition);
+ Mockito.when(localResource.getId()).thenReturn(nextResourceId++);
+ IndexInfo indexInfo = new IndexInfo(index, DATASET_ID, localResource, partition);
+ indexInfo.setOpen(true);
+ return indexInfo;
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 6634e51..e8f2595 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -42,6 +42,7 @@ import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.asterix.common.transactions.ImmutableDatasetId;
import org.apache.asterix.common.transactions.TransactionOptions;
import org.apache.asterix.common.transactions.TxnId;
+import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
import org.apache.asterix.metadata.api.ExtensionMetadataDataset;
@@ -474,7 +475,9 @@ public class MetadataNode implements IMetadataNode {
IIndexAccessParameters iap = new IndexAccessParameters(modCallback, NoOpOperationCallback.INSTANCE);
ILSMIndexAccessor indexAccessor = lsmIndex.createAccessor(iap);
txnCtx.setWriteTxn(true);
- txnCtx.register(metadataIndex.getResourceId(), lsmIndex, modCallback, metadataIndex.isPrimaryIndex());
+ txnCtx.register(metadataIndex.getResourceId(),
+ StoragePathUtil.getPartitionNumFromRelativePath(resourceName), lsmIndex, modCallback,
+ metadataIndex.isPrimaryIndex());
LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) lsmIndex, transactionSubsystem.getLogManager());
switch (op) {
case INSERT:
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 9753bcf..9ebd21b 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -63,7 +63,6 @@ import org.apache.asterix.metadata.utils.MetadataConstants;
import org.apache.asterix.metadata.utils.MetadataUtil;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
-import org.apache.asterix.runtime.utils.ClusterStateManager;
import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerFactory;
import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOperationTrackerFactory;
import org.apache.asterix.transaction.management.resource.DatasetLocalResourceFactory;
@@ -476,4 +475,5 @@ public class MetadataBootstrap {
public static void setNewUniverse(boolean isNewUniverse) {
MetadataBootstrap.isNewUniverse = isNewUniverse;
}
+
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index ea2d715..8cd7053 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -42,6 +42,7 @@ import org.apache.asterix.common.metadata.IDataset;
import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
import org.apache.asterix.common.utils.JobUtils;
import org.apache.asterix.common.utils.JobUtils.ProgressState;
+import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.external.feed.management.FeedConnectionId;
import org.apache.asterix.external.indexing.IndexingConstants;
import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider;
@@ -818,6 +819,10 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
protected int[] getDatasetPartitions(MetadataProvider metadataProvider) throws AlgebricksException {
FileSplit[] splitsForDataset =
metadataProvider.splitsForIndex(metadataProvider.getMetadataTxnContext(), this, getDatasetName());
- return IntStream.range(0, splitsForDataset.length).toArray();
+ int[] partitions = new int[splitsForDataset.length];
+ for (int i = 0; i < partitions.length; i++) {
+ partitions[i] = StoragePathUtil.getPartitionNumFromRelativePath(splitsForDataset[i].getPath());
+ }
+ return partitions;
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
index 7913d48..6faffc7 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/listener/JobEventListenerFactory.java
@@ -18,13 +18,12 @@
*/
package org.apache.asterix.runtime.job.listener;
-import static org.apache.asterix.common.transactions.ITransactionManager.AtomicityLevel;
-
import org.apache.asterix.common.api.IJobEventListenerFactory;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.ITransactionManager.AtomicityLevel;
import org.apache.asterix.common.transactions.TransactionOptions;
import org.apache.asterix.common.transactions.TxnId;
import org.apache.hyracks.api.context.IHyracksJobletContext;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index d057c50..10c6b8f 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -73,7 +73,7 @@ public class PrimaryIndexModificationOperationCallbackFactory extends AbstractOp
IModificationOperationCallback modCallback = new PrimaryIndexModificationOperationCallback(
new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem,
resource.getId(), aResource.getPartition(), resourceType, indexOp, operatorNodePushable);
- txnCtx.register(resource.getId(), index, modCallback, true);
+ txnCtx.register(resource.getId(), aResource.getPartition(), index, modCallback, true);
return modCallback;
} catch (ACIDException e) {
throw HyracksDataException.create(e);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java
index f40140a..eef1cb0 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexOperationTrackerFactory.java
@@ -21,9 +21,12 @@ package org.apache.asterix.transaction.management.opcallbacks;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTrackerFactory;
+import org.apache.hyracks.storage.common.IResource;
public class PrimaryIndexOperationTrackerFactory implements ILSMOperationTrackerFactory {
@@ -36,10 +39,12 @@ public class PrimaryIndexOperationTrackerFactory implements ILSMOperationTracker
}
@Override
- public ILSMOperationTracker getOperationTracker(INCServiceContext ctx) {
+ public ILSMOperationTracker getOperationTracker(INCServiceContext ctx, IResource resource)
+ throws HyracksDataException {
IDatasetLifecycleManager dslcManager =
((INcApplicationContext) ctx.getApplicationContext()).getDatasetLifecycleManager();
- return dslcManager.getOperationTracker(datasetId);
+ int partition = StoragePathUtil.getPartitionNumFromRelativePath(resource.getPath());
+ return dslcManager.getOperationTracker(datasetId, partition);
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/2f934e31/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index 26e1b22..a927da0 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -69,7 +69,7 @@ public class SecondaryIndexModificationOperationCallbackFactory extends Abstract
IModificationOperationCallback modCallback = new SecondaryIndexModificationOperationCallback(
new DatasetId(datasetId), primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem,
resource.getId(), aResource.getPartition(), resourceType, indexOp);
- txnCtx.register(resource.getId(), index, modCallback, false);
+ txnCtx.register(resource.getId(), aResource.getPartition(), index, modCallback, false);
return modCallback;
} catch (ACIDException e) {
throw HyracksDataException.create(e);