You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2016/04/07 17:00:23 UTC
[48/50] [abbrv] incubator-asterixdb git commit: Merge remote-tracking
branch 'hyracks-local/master' into hyracks-merge2
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e928b6ac/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
----------------------------------------------------------------------
diff --cc hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
index 8e39223,0000000..e4be66b
mode 100644,000000..100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/ExternalIndexHarness.java
@@@ -1,368 -1,0 +1,379 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.IIndexCursor;
+import org.apache.hyracks.storage.am.common.api.ISearchPredicate;
+import org.apache.hyracks.storage.am.common.api.IndexException;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexInternal;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import org.apache.hyracks.storage.am.lsm.common.api.ITwoPCIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+
+public class ExternalIndexHarness extends LSMHarness {
+ private static final Logger LOGGER = Logger.getLogger(ExternalIndexHarness.class.getName());
+
- public ExternalIndexHarness(ILSMIndexInternal lsmIndex, ILSMMergePolicy mergePolicy,
- ILSMOperationTracker opTracker, boolean replicationEnabled) {
++ public ExternalIndexHarness(ILSMIndexInternal lsmIndex, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker,
++ boolean replicationEnabled) {
+ super(lsmIndex, mergePolicy, opTracker, replicationEnabled);
+ }
+
+ @Override
+ protected boolean getAndEnterComponents(ILSMIndexOperationContext ctx, LSMOperationType opType,
+ boolean isTryOperation) throws HyracksDataException {
++ validateOperationEnterComponentsState(ctx);
+ synchronized (opTracker) {
+ while (true) {
+ lsmIndex.getOperationalComponents(ctx);
+ // Before entering the components, prune those corner cases that indeed should not proceed.
+ switch (opType) {
+ case MERGE:
+ if (ctx.getComponentHolder().size() < 2) {
+ // There is only a single component. There is nothing to merge.
+ return false;
+ }
+ default:
+ break;
+ }
+ if (enterComponents(ctx, opType)) {
+ return true;
+ } else if (isTryOperation) {
+ return false;
+ }
+ }
+ }
+ }
+
+ @Override
+ protected boolean enterComponents(ILSMIndexOperationContext ctx, LSMOperationType opType)
+ throws HyracksDataException {
++ validateOperationEnterComponentsState(ctx);
+ List<ILSMComponent> components = ctx.getComponentHolder();
+ int numEntered = 0;
+ boolean entranceSuccessful = false;
+ try {
+ for (ILSMComponent c : components) {
+ if (!c.threadEnter(opType, false)) {
+ break;
+ }
+ numEntered++;
+ }
+ entranceSuccessful = numEntered == components.size();
+ } finally {
+ if (!entranceSuccessful) {
+ for (ILSMComponent c : components) {
+ if (numEntered == 0) {
+ break;
+ }
+ c.threadExit(opType, true, false);
+ numEntered--;
+ }
+ return false;
+ }
++ ctx.setAccessingComponents(true);
+ }
+ // Check if there is any action that is needed to be taken based on the operation type
+ switch (opType) {
+ case MERGE:
+ lsmIndex.getIOOperationCallback().beforeOperation(LSMOperationType.MERGE);
+ default:
+ break;
+ }
+ opTracker.beforeOperation(lsmIndex, opType, ctx.getSearchOperationCallback(), ctx.getModificationCallback());
+ return true;
+ }
+
+ private void exitComponents(ILSMIndexOperationContext ctx, LSMOperationType opType, ILSMComponent newComponent,
+ boolean failedOperation) throws HyracksDataException, IndexException {
++ /**
++ * FLUSH and MERGE operations should always exit the components
++ * to notify waiting threads.
++ */
++ if (!ctx.isAccessingComponents() && opType != LSMOperationType.FLUSH && opType != LSMOperationType.MERGE) {
++ return;
++ }
+ synchronized (opTracker) {
+ try {
+ // First check if there is any action that is needed to be taken based on the state of each component.
+ for (ILSMComponent c : ctx.getComponentHolder()) {
+ c.threadExit(opType, failedOperation, false);
+ switch (c.getState()) {
+ case INACTIVE:
+ if (replicationEnabled) {
+ componentsToBeReplicated.clear();
+ componentsToBeReplicated.add(c);
+ lsmIndex.scheduleReplication(null, componentsToBeReplicated, false,
+ ReplicationOperation.DELETE, opType);
+ }
+ ((AbstractDiskLSMComponent) c).destroy();
+ break;
+ default:
+ break;
+ }
+ }
++ ctx.setAccessingComponents(false);
+ // Then, perform any action that is needed to be taken based on the operation type.
+ switch (opType) {
+ case MERGE:
+ // newComponent is null if the merge op. was not performed.
+ if (newComponent != null) {
+ beforeSubsumeMergedComponents(newComponent, ctx.getComponentHolder());
+ lsmIndex.subsumeMergedComponents(newComponent, ctx.getComponentHolder());
+ if (replicationEnabled) {
+ componentsToBeReplicated.clear();
+ componentsToBeReplicated.add(newComponent);
+ triggerReplication(componentsToBeReplicated, false, opType);
+ }
+ mergePolicy.diskComponentAdded(lsmIndex, fullMergeIsRequested.get());
+ }
+ break;
+ default:
+ break;
+ }
+ } finally {
+ opTracker.afterOperation(lsmIndex, opType, ctx.getSearchOperationCallback(),
+ ctx.getModificationCallback());
+ }
+ }
+ }
+
+ @Override
- public void forceModify(ILSMIndexOperationContext ctx, ITupleReference tuple) throws HyracksDataException,
- IndexException {
++ public void forceModify(ILSMIndexOperationContext ctx, ITupleReference tuple)
++ throws HyracksDataException, IndexException {
+ throw new IndexException("2PC LSM Inedx doesn't support modify");
+ }
+
+ @Override
+ public boolean modify(ILSMIndexOperationContext ctx, boolean tryOperation, ITupleReference tuple)
+ throws HyracksDataException, IndexException {
+ throw new IndexException("2PC LSM Inedx doesn't support modify");
+ }
+
+ @Override
+ public void search(ILSMIndexOperationContext ctx, IIndexCursor cursor, ISearchPredicate pred)
+ throws HyracksDataException, IndexException {
+ LSMOperationType opType = LSMOperationType.SEARCH;
+ getAndEnterComponents(ctx, opType, false);
+ try {
+ lsmIndex.search(ctx, cursor, pred);
+ } catch (HyracksDataException | IndexException e) {
+ exitComponents(ctx, opType, null, true);
+ throw e;
+ }
+ }
+
+ @Override
+ public void endSearch(ILSMIndexOperationContext ctx) throws HyracksDataException {
+ if (ctx.getOperation() == IndexOperation.SEARCH) {
+ try {
+ exitComponents(ctx, LSMOperationType.SEARCH, null, false);
+ } catch (IndexException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ @Override
+ public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ throws HyracksDataException, IndexException {
+ if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) {
+ callback.afterFinalize(LSMOperationType.MERGE, null);
+ return;
+ }
+ lsmIndex.scheduleMerge(ctx, callback);
+ }
+
+ @Override
+ public void scheduleFullMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ throws HyracksDataException, IndexException {
+ fullMergeIsRequested.set(true);
+ if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) {
+ // If the merge cannot be scheduled because there is already an ongoing merge on subset/all of the components, then
+ // whenever the current merge has finished, it will schedule the full merge again.
+ callback.afterFinalize(LSMOperationType.MERGE, null);
+ return;
+ }
+ fullMergeIsRequested.set(false);
+ lsmIndex.scheduleMerge(ctx, callback);
+ }
+
+ @Override
- public void merge(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
- IndexException {
++ public void merge(ILSMIndexOperationContext ctx, ILSMIOOperation operation)
++ throws HyracksDataException, IndexException {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Started a merge operation for index: " + lsmIndex + " ...");
+ }
+
+ ILSMComponent newComponent = null;
+ try {
+ newComponent = lsmIndex.merge(operation);
+ operation.getCallback().afterOperation(LSMOperationType.MERGE, ctx.getComponentHolder(), newComponent);
+ lsmIndex.markAsValid(newComponent);
+ } finally {
+ exitComponents(ctx, LSMOperationType.MERGE, newComponent, false);
+ operation.getCallback().afterFinalize(LSMOperationType.MERGE, newComponent);
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Finished the merge operation for index: " + lsmIndex);
+ }
+ }
+
+ @Override
+ public void addBulkLoadedComponent(ILSMComponent c) throws HyracksDataException, IndexException {
+ lsmIndex.markAsValid(c);
+ synchronized (opTracker) {
+ lsmIndex.addComponent(c);
+ if (replicationEnabled) {
+ componentsToBeReplicated.clear();
+ componentsToBeReplicated.add(c);
+ triggerReplication(componentsToBeReplicated, true, LSMOperationType.MERGE);
+ }
+ // Enter the component
+ enterComponent(c);
+ mergePolicy.diskComponentAdded(lsmIndex, false);
+ }
+ }
+
+ // Three differences from addBulkLoadedComponent
+ // 1. this needs synchronization since others might be accessing the index (specifically merge operations that might change the lists of components)
+ // 2. the actions taken by the index itself are different
+ // 3. the component has already been marked valid by the bulk update operation
+ public void addTransactionComponents(ILSMComponent newComponent) throws HyracksDataException, IndexException {
+ ITwoPCIndex index = (ITwoPCIndex) lsmIndex;
+ synchronized (opTracker) {
+ List<ILSMComponent> newerList;
+ List<ILSMComponent> olderList;
+ if (index.getCurrentVersion() == 0) {
+ newerList = index.getFirstComponentList();
+ olderList = index.getSecondComponentList();
+ } else {
+ newerList = index.getSecondComponentList();
+ olderList = index.getFirstComponentList();
+ }
+ // Exit components in old version of the index so they are ready to be
+ // deleted if they are not needed anymore
+ for (ILSMComponent c : olderList) {
+ exitComponent(c);
+ }
+ // Enter components in the newer list
+ for (ILSMComponent c : newerList) {
+ enterComponent(c);
+ }
+ if (newComponent != null) {
+ // Enter new component
+ enterComponent(newComponent);
+ }
+ index.commitTransactionDiskComponent(newComponent);
+ mergePolicy.diskComponentAdded(lsmIndex, fullMergeIsRequested.get());
+ }
+ }
+
+ @Override
+ public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ throws HyracksDataException {
+ callback.afterFinalize(LSMOperationType.FLUSH, null);
+ }
+
+ @Override
- public void flush(ILSMIndexOperationContext ctx, ILSMIOOperation operation) throws HyracksDataException,
- IndexException {
++ public void flush(ILSMIndexOperationContext ctx, ILSMIOOperation operation)
++ throws HyracksDataException, IndexException {
+ }
+
+ @Override
+ public ILSMOperationTracker getOperationTracker() {
+ return opTracker;
+ }
+
+ public void beforeSubsumeMergedComponents(ILSMComponent newComponent, List<ILSMComponent> mergedComponents)
+ throws HyracksDataException {
+ ITwoPCIndex index = (ITwoPCIndex) lsmIndex;
+ // check if merge will affect the first list
+ if (index.getFirstComponentList().containsAll(mergedComponents)) {
+ // exit un-needed components
+ for (ILSMComponent c : mergedComponents) {
+ exitComponent(c);
+ }
+ // enter new component
+ enterComponent(newComponent);
+ }
+ // check if merge will affect the second list
+ if (index.getSecondComponentList().containsAll(mergedComponents)) {
+ // exit un-needed components
+ for (ILSMComponent c : mergedComponents) {
+ exitComponent(c);
+ }
+ // enter new component
+ enterComponent(newComponent);
+ }
+ }
+
+ // The two methods: enterComponent and exitComponent are used to control
+ // when components are to be deleted from disk
+ private void enterComponent(ILSMComponent diskComponent) throws HyracksDataException {
+ diskComponent.threadEnter(LSMOperationType.SEARCH, false);
+ }
+
+ private void exitComponent(ILSMComponent diskComponent) throws HyracksDataException {
+ diskComponent.threadExit(LSMOperationType.SEARCH, false, false);
+ if (diskComponent.getState() == ILSMComponent.ComponentState.INACTIVE) {
+ if (replicationEnabled) {
+ componentsToBeReplicated.clear();
+ componentsToBeReplicated.add(diskComponent);
+ lsmIndex.scheduleReplication(null, componentsToBeReplicated, false, ReplicationOperation.DELETE, null);
+ }
+ ((AbstractDiskLSMComponent) diskComponent).destroy();
+ }
+ }
+
+ public void indexFirstTimeActivated() throws HyracksDataException {
+ ITwoPCIndex index = (ITwoPCIndex) lsmIndex;
+ // Enter disk components <-- To avoid deleting them when they are
+ // still needed-->
+ for (ILSMComponent c : index.getFirstComponentList()) {
+ enterComponent(c);
+ }
+ for (ILSMComponent c : index.getSecondComponentList()) {
+ enterComponent(c);
+ }
+ }
+
+ public void indexClear() throws HyracksDataException {
+ ITwoPCIndex index = (ITwoPCIndex) lsmIndex;
+ for (ILSMComponent c : index.getFirstComponentList()) {
+ exitComponent(c);
+ }
+ for (ILSMComponent c : index.getSecondComponentList()) {
+ exitComponent(c);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e928b6ac/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
----------------------------------------------------------------------
diff --cc hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 21b0d8a,0000000..a19532f
mode 100644,000000..100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@@ -1,508 -1,0 +1,526 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.IIndexCursor;
+import org.apache.hyracks.storage.am.common.api.ISearchPredicate;
+import org.apache.hyracks.storage.am.common.api.IndexException;
+import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.LSMComponentType;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessorInternal;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexInternal;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+
+public class LSMHarness implements ILSMHarness {
+ private static final Logger LOGGER = Logger.getLogger(LSMHarness.class.getName());
+
+ protected final ILSMIndexInternal lsmIndex;
+ protected final ILSMMergePolicy mergePolicy;
+ protected final ILSMOperationTracker opTracker;
+ protected final AtomicBoolean fullMergeIsRequested;
+ protected final boolean replicationEnabled;
+ protected List<ILSMComponent> componentsToBeReplicated;
+
+ public LSMHarness(ILSMIndexInternal lsmIndex, ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker,
+ boolean replicationEnabled) {
+ this.lsmIndex = lsmIndex;
+ this.opTracker = opTracker;
+ this.mergePolicy = mergePolicy;
+ fullMergeIsRequested = new AtomicBoolean();
- this.replicationEnabled = replicationEnabled;
++ //only durable indexes are replicated
++ this.replicationEnabled = replicationEnabled && lsmIndex.isDurable();
+ if (replicationEnabled) {
+ this.componentsToBeReplicated = new ArrayList<ILSMComponent>();
+ }
+ }
+
+ protected boolean getAndEnterComponents(ILSMIndexOperationContext ctx, LSMOperationType opType,
+ boolean isTryOperation) throws HyracksDataException {
++ validateOperationEnterComponentsState(ctx);
+ synchronized (opTracker) {
+ while (true) {
+ lsmIndex.getOperationalComponents(ctx);
+ // Before entering the components, prune those corner cases that indeed should not proceed.
+ switch (opType) {
+ case FLUSH:
+ ILSMComponent flushingComponent = ctx.getComponentHolder().get(0);
+ if (!((AbstractMemoryLSMComponent) flushingComponent).isModified()) {
+ //The mutable component has not been modified by any writer. There is nothing to flush.
+ //since the component is empty, set its state back to READABLE_WRITABLE
+ if (((AbstractLSMIndex) lsmIndex)
+ .getCurrentMutableComponentState() == ComponentState.READABLE_UNWRITABLE) {
+ ((AbstractLSMIndex) lsmIndex)
+ .setCurrentMutableComponentState(ComponentState.READABLE_WRITABLE);
+ }
+ return false;
+ }
+ if (((AbstractMemoryLSMComponent) flushingComponent).getWriterCount() > 0) {
+ /*
+ * This case is a case where even though FLUSH log was flushed to disk and scheduleFlush is triggered,
+ * the current in-memory component (whose state was changed to READABLE_WRITABLE (RW)
+ * from READABLE_UNWRITABLE(RU) before FLUSH log was written to log tail (which is memory buffer of log file)
+ * and then the state was changed back to RW (as shown in the following scenario)) can have writers
+ * based on the current code base/design.
+ * Thus, the writer count of the component may be greater than 0.
+ * if this happens, intead of throwing exception, scheduleFlush() deal with this situation by not flushing
+ * the component.
+ * Please see issue 884 for more detail information:
+ * https://code.google.com/p/asterixdb/issues/detail?id=884&q=owner%3Akisskys%40gmail.com&colspec=ID%20Type%20Status%20Priority%20Milestone%20Owner%20Summary%20ETA%20Severity
+ *
+ */
+ return false;
+ }
+ break;
+ case MERGE:
+ if (ctx.getComponentHolder().size() < 2) {
+ // There is only a single component. There is nothing to merge.
+ return false;
+ }
+ default:
+ break;
+ }
+ if (enterComponents(ctx, opType)) {
+ return true;
+ } else if (isTryOperation) {
+ return false;
+ }
+ try {
+ // Flush and merge operations should never reach this wait call, because they are always try operations.
+ // If they fail to enter the components, then it means that there are an ongoing flush/merge operation on
+ // the same components, so they should not proceed.
+ opTracker.wait();
+ } catch (InterruptedException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+ }
+
+ protected boolean enterComponents(ILSMIndexOperationContext ctx, LSMOperationType opType)
+ throws HyracksDataException {
++ validateOperationEnterComponentsState(ctx);
+ List<ILSMComponent> components = ctx.getComponentHolder();
+ int numEntered = 0;
+ boolean entranceSuccessful = false;
+ try {
+ for (ILSMComponent c : components) {
+ boolean isMutableComponent = numEntered == 0 && c.getType() == LSMComponentType.MEMORY ? true : false;
+ if (!c.threadEnter(opType, isMutableComponent)) {
+ break;
+ }
+ numEntered++;
+ }
+ entranceSuccessful = numEntered == components.size();
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw e;
+ } finally {
+ if (!entranceSuccessful) {
+ int i = 0;
+ for (ILSMComponent c : components) {
+ if (numEntered == 0) {
+ break;
+ }
+ boolean isMutableComponent = i == 0 && c.getType() == LSMComponentType.MEMORY ? true : false;
+ c.threadExit(opType, true, isMutableComponent);
+ i++;
+ numEntered--;
+ }
+ return false;
+ }
++ ctx.setAccessingComponents(true);
+ }
+ // Check if there is any action that is needed to be taken based on the operation type
+ switch (opType) {
+ case FLUSH:
+ lsmIndex.getIOOperationCallback().beforeOperation(LSMOperationType.FLUSH);
+ // Changing the flush status should *always* precede changing the mutable component.
+ lsmIndex.changeFlushStatusForCurrentMutableCompoent(false);
+ lsmIndex.changeMutableComponent();
+ // Notify all waiting threads whenever a flush has been scheduled since they will check
+ // again if they can grab and enter the mutable component.
+ opTracker.notifyAll();
+ break;
+ case MERGE:
+ lsmIndex.getIOOperationCallback().beforeOperation(LSMOperationType.MERGE);
+ default:
+ break;
+ }
+ opTracker.beforeOperation(lsmIndex, opType, ctx.getSearchOperationCallback(), ctx.getModificationCallback());
+ return true;
+ }
+
+ private void exitComponents(ILSMIndexOperationContext ctx, LSMOperationType opType, ILSMComponent newComponent,
+ boolean failedOperation) throws HyracksDataException, IndexException {
++ /**
++ * FLUSH and MERGE operations should always exit the components
++ * to notify waiting threads.
++ */
++ if (!ctx.isAccessingComponents() && opType != LSMOperationType.FLUSH && opType != LSMOperationType.MERGE) {
++ return;
++ }
+ List<ILSMComponent> inactiveDiskComponents = null;
+ List<ILSMComponent> inactiveDiskComponentsToBeDeleted = null;
+ try {
+ synchronized (opTracker) {
+ try {
+ int i = 0;
+ // First check if there is any action that is needed to be taken based on the state of each component.
+ for (ILSMComponent c : ctx.getComponentHolder()) {
+ boolean isMutableComponent = i == 0 && c.getType() == LSMComponentType.MEMORY ? true : false;
+ c.threadExit(opType, failedOperation, isMutableComponent);
+ if (c.getType() == LSMComponentType.MEMORY) {
+ switch (c.getState()) {
+ case READABLE_UNWRITABLE:
+ if (isMutableComponent && (opType == LSMOperationType.MODIFICATION
+ || opType == LSMOperationType.FORCE_MODIFICATION)) {
+ lsmIndex.changeFlushStatusForCurrentMutableCompoent(true);
+ }
+ break;
+ case INACTIVE:
+ ((AbstractMemoryLSMComponent) c).reset();
+ // Notify all waiting threads whenever the mutable component's has change to inactive. This is important because
+ // even though we switched the mutable components, it is possible that the component that we just switched
+ // to is still busy flushing its data to disk. Thus, the notification that was issued upon scheduling the flush
+ // is not enough.
+ opTracker.notifyAll();
+ break;
+ default:
+ break;
+ }
+ } else {
+ switch (c.getState()) {
+ case INACTIVE:
+ lsmIndex.addInactiveDiskComponent(c);
+ break;
+ default:
+ break;
+ }
+ }
+ i++;
+ }
++ ctx.setAccessingComponents(false);
+ // Then, perform any action that is needed to be taken based on the operation type.
+ switch (opType) {
+ case FLUSH:
+ // newComponent is null if the flush op. was not performed.
+ if (newComponent != null) {
+ lsmIndex.addComponent(newComponent);
+ if (replicationEnabled) {
+ componentsToBeReplicated.clear();
+ componentsToBeReplicated.add(newComponent);
+ triggerReplication(componentsToBeReplicated, false, opType);
+ }
+ mergePolicy.diskComponentAdded(lsmIndex, false);
+ }
+ break;
+ case MERGE:
+ // newComponent is null if the merge op. was not performed.
+ if (newComponent != null) {
+ lsmIndex.subsumeMergedComponents(newComponent, ctx.getComponentHolder());
+ if (replicationEnabled) {
+ componentsToBeReplicated.clear();
+ componentsToBeReplicated.add(newComponent);
+ triggerReplication(componentsToBeReplicated, false, opType);
+ }
+ mergePolicy.diskComponentAdded(lsmIndex, fullMergeIsRequested.get());
+ }
+ break;
+ default:
+ break;
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw e;
+ } finally {
+ if (failedOperation && (opType == LSMOperationType.MODIFICATION
+ || opType == LSMOperationType.FORCE_MODIFICATION)) {
+ //When the operation failed, completeOperation() method must be called
+ //in order to decrement active operation count which was incremented in beforeOperation() method.
+ opTracker.completeOperation(lsmIndex, opType, ctx.getSearchOperationCallback(),
+ ctx.getModificationCallback());
+ } else {
+ opTracker.afterOperation(lsmIndex, opType, ctx.getSearchOperationCallback(),
+ ctx.getModificationCallback());
+ }
+
+ /*
+ * = Inactive disk components lazy cleanup if any =
+ * Prepare to cleanup inactive diskComponents which were old merged components
+ * and not anymore accessed.
+ * This cleanup is done outside of optracker synchronized block.
+ */
+ inactiveDiskComponents = lsmIndex.getInactiveDiskComponents();
+ if (!inactiveDiskComponents.isEmpty()) {
+ for (ILSMComponent inactiveComp : inactiveDiskComponents) {
+ if (((AbstractDiskLSMComponent) inactiveComp).getFileReferenceCount() == 1) {
+ if (inactiveDiskComponentsToBeDeleted == null) {
+ inactiveDiskComponentsToBeDeleted = new LinkedList<ILSMComponent>();
+ }
+ inactiveDiskComponentsToBeDeleted.add(inactiveComp);
+ }
+ }
+ if (inactiveDiskComponentsToBeDeleted != null) {
+ inactiveDiskComponents.removeAll(inactiveDiskComponentsToBeDeleted);
+ }
+ }
+ }
+ }
+ } finally {
+ /*
+ * cleanup inactive disk components if any
+ */
+ if (inactiveDiskComponentsToBeDeleted != null) {
+ try {
+ //schedule a replication job to delete these inactive disk components from replicas
+ if (replicationEnabled) {
+ lsmIndex.scheduleReplication(null, inactiveDiskComponentsToBeDeleted, false,
+ ReplicationOperation.DELETE, opType);
+ }
+
+ for (ILSMComponent c : inactiveDiskComponentsToBeDeleted) {
+ ((AbstractDiskLSMComponent) c).destroy();
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+ }
+
+ }
+
+ @Override
+ public void forceModify(ILSMIndexOperationContext ctx, ITupleReference tuple)
+ throws HyracksDataException, IndexException {
+ LSMOperationType opType = LSMOperationType.FORCE_MODIFICATION;
+ modify(ctx, false, tuple, opType);
+ }
+
+ @Override
+ public boolean modify(ILSMIndexOperationContext ctx, boolean tryOperation, ITupleReference tuple)
+ throws HyracksDataException, IndexException {
+ LSMOperationType opType = LSMOperationType.MODIFICATION;
+ return modify(ctx, tryOperation, tuple, opType);
+ }
+
+ private boolean modify(ILSMIndexOperationContext ctx, boolean tryOperation, ITupleReference tuple,
+ LSMOperationType opType) throws HyracksDataException, IndexException {
+ if (!lsmIndex.isMemoryComponentsAllocated()) {
+ lsmIndex.allocateMemoryComponents();
+ }
+ boolean failedOperation = false;
+ if (!getAndEnterComponents(ctx, opType, tryOperation)) {
+ return false;
+ }
+ try {
+ lsmIndex.modify(ctx, tuple);
+ // The mutable component is always in the first index.
+ AbstractMemoryLSMComponent mutableComponent = (AbstractMemoryLSMComponent) ctx.getComponentHolder().get(0);
+ mutableComponent.setIsModified();
+ } catch (Exception e) {
+ failedOperation = true;
+ throw e;
+ } finally {
+ exitComponents(ctx, opType, null, failedOperation);
+ }
+ return true;
+ }
+
+ @Override
+ public void search(ILSMIndexOperationContext ctx, IIndexCursor cursor, ISearchPredicate pred)
+ throws HyracksDataException, IndexException {
+ LSMOperationType opType = LSMOperationType.SEARCH;
+ ctx.setSearchPredicate(pred);
+ getAndEnterComponents(ctx, opType, false);
+ try {
+ ctx.getSearchOperationCallback().before(pred.getLowKey());
+ lsmIndex.search(ctx, cursor, pred);
+ } catch (HyracksDataException | IndexException e) {
+ exitComponents(ctx, opType, null, true);
+ throw e;
+ }
+ }
+
+ @Override
+ public void endSearch(ILSMIndexOperationContext ctx) throws HyracksDataException {
+ if (ctx.getOperation() == IndexOperation.SEARCH) {
+ try {
+ exitComponents(ctx, LSMOperationType.SEARCH, null, false);
+ } catch (IndexException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+ }
+
+ @Override
+ public void scheduleFlush(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ throws HyracksDataException {
+ if (!getAndEnterComponents(ctx, LSMOperationType.FLUSH, true)) {
+ callback.afterFinalize(LSMOperationType.FLUSH, null);
+ return;
+ }
+ lsmIndex.scheduleFlush(ctx, callback);
+ }
+
+ @Override
+ public void flush(ILSMIndexOperationContext ctx, ILSMIOOperation operation)
+ throws HyracksDataException, IndexException {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Started a flush operation for index: " + lsmIndex + " ...");
+ }
+
+ ILSMComponent newComponent = null;
+ try {
+ newComponent = lsmIndex.flush(operation);
+ operation.getCallback().afterOperation(LSMOperationType.FLUSH, null, newComponent);
+ lsmIndex.markAsValid(newComponent);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw e;
+ } finally {
+ exitComponents(ctx, LSMOperationType.FLUSH, newComponent, false);
+ operation.getCallback().afterFinalize(LSMOperationType.FLUSH, newComponent);
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Finished the flush operation for index: " + lsmIndex);
+ }
+ }
+
+ @Override
+ public void scheduleMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ throws HyracksDataException, IndexException {
+ if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) {
+ callback.afterFinalize(LSMOperationType.MERGE, null);
+ return;
+ }
+ lsmIndex.scheduleMerge(ctx, callback);
+ }
+
+ @Override
+ public void scheduleFullMerge(ILSMIndexOperationContext ctx, ILSMIOOperationCallback callback)
+ throws HyracksDataException, IndexException {
+ fullMergeIsRequested.set(true);
+ if (!getAndEnterComponents(ctx, LSMOperationType.MERGE, true)) {
+ // If the merge cannot be scheduled because there is already an ongoing merge on subset/all of the components, then
+ // whenever the current merge has finished, it will schedule the full merge again.
+ callback.afterFinalize(LSMOperationType.MERGE, null);
+ return;
+ }
+ fullMergeIsRequested.set(false);
+ lsmIndex.scheduleMerge(ctx, callback);
+ }
+
+ @Override
+ public void merge(ILSMIndexOperationContext ctx, ILSMIOOperation operation)
+ throws HyracksDataException, IndexException {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Started a merge operation for index: " + lsmIndex + " ...");
+ }
+
+ ILSMComponent newComponent = null;
+ try {
+ newComponent = lsmIndex.merge(operation);
+ operation.getCallback().afterOperation(LSMOperationType.MERGE, ctx.getComponentHolder(), newComponent);
+ lsmIndex.markAsValid(newComponent);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ throw e;
+ } finally {
+ exitComponents(ctx, LSMOperationType.MERGE, newComponent, false);
+ operation.getCallback().afterFinalize(LSMOperationType.MERGE, newComponent);
+ }
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Finished the merge operation for index: " + lsmIndex);
+ }
+ }
+
+ @Override
+ public void addBulkLoadedComponent(ILSMComponent c) throws HyracksDataException, IndexException {
+ lsmIndex.markAsValid(c);
+ synchronized (opTracker) {
+ lsmIndex.addComponent(c);
+ if (replicationEnabled) {
+ componentsToBeReplicated.clear();
+ componentsToBeReplicated.add(c);
+ triggerReplication(componentsToBeReplicated, true, LSMOperationType.MERGE);
+ }
+ mergePolicy.diskComponentAdded(lsmIndex, false);
+ }
+ }
+
+ @Override
+ public ILSMOperationTracker getOperationTracker() {
+ return opTracker;
+ }
+
+ protected void triggerReplication(List<ILSMComponent> lsmComponents, boolean bulkload, LSMOperationType opType)
+ throws HyracksDataException {
+ ILSMIndexAccessorInternal accessor = lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE,
+ NoOpOperationCallback.INSTANCE);
+ accessor.scheduleReplication(lsmComponents, bulkload, opType);
+ }
+
+ @Override
+ public void scheduleReplication(ILSMIndexOperationContext ctx, List<ILSMComponent> lsmComponents, boolean bulkload,
+ LSMOperationType opType) throws HyracksDataException {
+
+ //enter the LSM components to be replicated to prevent them from being deleted until they are replicated
+ if (!getAndEnterComponents(ctx, LSMOperationType.REPLICATE, false)) {
+ return;
+ }
+
+ lsmIndex.scheduleReplication(ctx, lsmComponents, bulkload, ReplicationOperation.REPLICATE, opType);
+ }
+
+ @Override
+ public void endReplication(ILSMIndexOperationContext ctx) throws HyracksDataException {
+ try {
+ exitComponents(ctx, LSMOperationType.REPLICATE, null, false);
+ } catch (IndexException e) {
+ throw new HyracksDataException(e);
+ }
+ }
++
++ protected void validateOperationEnterComponentsState(ILSMIndexOperationContext ctx) throws HyracksDataException {
++ if (ctx.isAccessingComponents()) {
++ throw new HyracksDataException("Opeartion already has access to components of index " + lsmIndex);
++ }
++ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e928b6ac/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
----------------------------------------------------------------------
diff --cc hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
index c4d2fcc,0000000..befdd85
mode 100644,000000..100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMIndexSearchCursor.java
@@@ -1,306 -1,0 +1,308 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.common.impls;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.storage.am.common.api.IIndexCursor;
+import org.apache.hyracks.storage.am.common.api.ITreeIndexCursor;
+import org.apache.hyracks.storage.am.common.api.IndexException;
+import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMTreeTupleReference;
+import org.apache.hyracks.storage.common.buffercache.IBufferCache;
+import org.apache.hyracks.storage.common.buffercache.ICachedPage;
+
+public abstract class LSMIndexSearchCursor implements ITreeIndexCursor {
+ protected final ILSMIndexOperationContext opCtx;
+ protected final boolean returnDeletedTuples;
+ protected PriorityQueueElement outputElement;
+ protected IIndexCursor[] rangeCursors;
+ protected PriorityQueueElement[] pqes;
+ protected PriorityQueue<PriorityQueueElement> outputPriorityQueue;
+ protected PriorityQueueComparator pqCmp;
+ protected MultiComparator cmp;
+ protected boolean needPush;
+ protected boolean includeMutableComponent;
+ protected ILSMHarness lsmHarness;
+
+ protected List<ILSMComponent> operationalComponents;
+
+ public LSMIndexSearchCursor(ILSMIndexOperationContext opCtx, boolean returnDeletedTuples) {
+ this.opCtx = opCtx;
+ this.returnDeletedTuples = returnDeletedTuples;
+ outputElement = null;
+ needPush = false;
+ }
+
+ public ILSMIndexOperationContext getOpCtx() {
+ return opCtx;
+ }
+
+ public void initPriorityQueue() throws HyracksDataException, IndexException {
+ int pqInitSize = (rangeCursors.length > 0) ? rangeCursors.length : 1;
+ if (outputPriorityQueue == null) {
+ outputPriorityQueue = new PriorityQueue<PriorityQueueElement>(pqInitSize, pqCmp);
+ pqes = new PriorityQueueElement[pqInitSize];
+ for (int i = 0; i < pqInitSize; i++) {
+ pqes[i] = new PriorityQueueElement(i);
+ }
+ for (int i = 0; i < rangeCursors.length; i++) {
+ pushIntoPriorityQueue(pqes[i]);
+ }
+ } else {
+ outputPriorityQueue.clear();
+ // did size change?
+ if (pqInitSize == pqes.length) {
+ // size is the same -> re-use
+ for (int i = 0; i < rangeCursors.length; i++) {
+ pqes[i].reset(null);
+ pushIntoPriorityQueue(pqes[i]);
+ }
+ } else {
+ // size changed (due to flushes, merges, etc) -> re-create
+ pqes = new PriorityQueueElement[pqInitSize];
+ for (int i = 0; i < rangeCursors.length; i++) {
+ pqes[i] = new PriorityQueueElement(i);
+ pushIntoPriorityQueue(pqes[i]);
+ }
+ }
+ }
+ }
+
+ public IIndexCursor getCursor(int cursorIndex) {
+ return rangeCursors[cursorIndex];
+ }
+
+ @Override
+ public void reset() throws HyracksDataException, IndexException {
+ outputElement = null;
+ needPush = false;
+
+ try {
+ if (outputPriorityQueue != null) {
+ outputPriorityQueue.clear();
+ }
+
+ if (rangeCursors != null) {
+ for (int i = 0; i < rangeCursors.length; i++) {
+ rangeCursors[i].reset();
+ }
+ }
+ rangeCursors = null;
+ } finally {
+ if (lsmHarness != null) {
+ lsmHarness.endSearch(opCtx);
+ }
+ }
+ }
+
+ @Override
+ public boolean hasNext() throws HyracksDataException, IndexException {
+ checkPriorityQueue();
+ return !outputPriorityQueue.isEmpty();
+ }
+
+ @Override
+ public void next() throws HyracksDataException {
+ outputElement = outputPriorityQueue.poll();
+ needPush = true;
+ }
+
+ @Override
+ public ICachedPage getPage() {
+ // do nothing
+ return null;
+ }
+
+ @Override
+ public void close() throws HyracksDataException {
+ try {
- outputPriorityQueue.clear();
++ if (outputPriorityQueue != null) {
++ outputPriorityQueue.clear();
++ }
+ for (int i = 0; i < rangeCursors.length; i++) {
+ rangeCursors[i].close();
+ }
+ rangeCursors = null;
+ } finally {
+ if (lsmHarness != null) {
+ lsmHarness.endSearch(opCtx);
+ }
+ }
+ }
+
+ @Override
+ public void setBufferCache(IBufferCache bufferCache) {
+ // do nothing
+ }
+
+ @Override
+ public void setFileId(int fileId) {
+ // do nothing
+ }
+
+ @Override
+ public ITupleReference getTuple() {
+ return outputElement.getTuple();
+ }
+
+ protected boolean pushIntoPriorityQueue(PriorityQueueElement e) throws HyracksDataException, IndexException {
+ int cursorIndex = e.getCursorIndex();
+ if (rangeCursors[cursorIndex].hasNext()) {
+ rangeCursors[cursorIndex].next();
+ e.reset(rangeCursors[cursorIndex].getTuple());
+ outputPriorityQueue.offer(e);
+ return true;
+ }
+ rangeCursors[cursorIndex].close();
+ return false;
+ }
+
+ protected boolean isDeleted(PriorityQueueElement checkElement) throws HyracksDataException, IndexException {
+ return ((ILSMTreeTupleReference) checkElement.getTuple()).isAntimatter();
+ }
+
+ protected void checkPriorityQueue() throws HyracksDataException, IndexException {
+ while (!outputPriorityQueue.isEmpty() || (needPush == true)) {
+ if (!outputPriorityQueue.isEmpty()) {
+ PriorityQueueElement checkElement = outputPriorityQueue.peek();
+ // If there is no previous tuple or the previous tuple can be ignored
+ if (outputElement == null) {
+ if (isDeleted(checkElement) && !returnDeletedTuples) {
+ // If the key has been deleted then pop it and set needPush to true.
+ // We cannot push immediately because the tuple may be
+ // modified if hasNext() is called
+ outputElement = outputPriorityQueue.poll();
+ needPush = true;
+ } else {
+ break;
+ }
+ } else {
+ // Compare the previous tuple and the head tuple in the PQ
+ if (compare(cmp, outputElement.getTuple(), checkElement.getTuple()) == 0) {
+ // If the previous tuple and the head tuple are
+ // identical
+ // then pop the head tuple and push the next tuple from
+ // the tree of head tuple
+
+ // the head element of PQ is useless now
+ PriorityQueueElement e = outputPriorityQueue.poll();
+ pushIntoPriorityQueue(e);
+ } else {
+ // If the previous tuple and the head tuple are different
+ // the info of previous tuple is useless
+ if (needPush == true) {
+ pushIntoPriorityQueue(outputElement);
+ needPush = false;
+ }
+ outputElement = null;
+ }
+ }
+ } else {
+ // the priority queue is empty and needPush
+ pushIntoPriorityQueue(outputElement);
+ needPush = false;
+ outputElement = null;
+ }
+ }
+ }
+
+ @Override
+ public boolean exclusiveLatchNodes() {
+ return false;
+ }
+
+ public class PriorityQueueElement {
+ private ITupleReference tuple;
+ private int cursorIndex;
+
+ public PriorityQueueElement(int cursorIndex) {
+ tuple = null;
+ this.cursorIndex = cursorIndex;
+ }
+
+ public ITupleReference getTuple() {
+ return tuple;
+ }
+
+ public int getCursorIndex() {
+ return cursorIndex;
+ }
+
+ public void reset(ITupleReference tuple) {
+ this.tuple = tuple;
+ }
+ }
+
+ public class PriorityQueueComparator implements Comparator<PriorityQueueElement> {
+
+ protected MultiComparator cmp;
+
+ public PriorityQueueComparator(MultiComparator cmp) {
+ this.cmp = cmp;
+ }
+
+ @Override
+ public int compare(PriorityQueueElement elementA, PriorityQueueElement elementB) {
+ int result;
+ try {
+ result = cmp.compare(elementA.getTuple(), elementB.getTuple());
+ if (result != 0) {
+ return result;
+ }
+ } catch (HyracksDataException e) {
+ throw new IllegalArgumentException(e);
+ }
+
+ if (elementA.getCursorIndex() > elementB.getCursorIndex()) {
+ return 1;
+ } else {
+ return -1;
+ }
+ }
+
+ public MultiComparator getMultiComparator() {
+ return cmp;
+ }
+ }
+
+ protected void setPriorityQueueComparator() {
+ if (pqCmp == null || cmp != pqCmp.getMultiComparator()) {
+ pqCmp = new PriorityQueueComparator(cmp);
+ }
+ }
+
+ protected int compare(MultiComparator cmp, ITupleReference tupleA, ITupleReference tupleB)
+ throws HyracksDataException {
+ return cmp.compare(tupleA, tupleB);
+ }
+
+ @Override
+ public void markCurrentTupleAsUpdated() throws HyracksDataException {
+ throw new HyracksDataException("Updating tuples is not supported with this cursor.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e928b6ac/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
----------------------------------------------------------------------
diff --cc hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
index c511a67,0000000..828e296
mode 100644,000000..100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexOpContext.java
@@@ -1,169 -1,0 +1,170 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.invertedindex.impls;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.common.api.IIndexAccessor;
+import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
+import org.apache.hyracks.storage.am.common.api.ISearchPredicate;
+import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator;
+import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
- import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
++import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexOperationContext;
+import org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexAccessor;
+
- public class LSMInvertedIndexOpContext implements ILSMIndexOperationContext {
++public class LSMInvertedIndexOpContext extends AbstractLSMIndexOperationContext {
+
+ private static final int NUM_DOCUMENT_FIELDS = 1;
+
+ private IndexOperation op;
+ private final List<ILSMComponent> componentHolder;
+ private final List<ILSMComponent> componentsToBeMerged;
+ private final List<ILSMComponent> componentsToBeReplicated;
+
+ private IModificationOperationCallback modificationCallback;
+ private ISearchOperationCallback searchCallback;
+
+ // Tuple that only has the inverted-index elements (aka keys), projecting away the document fields.
+ public PermutingTupleReference keysOnlyTuple;
+
+ // Accessor to the in-memory inverted indexes.
+ public IInvertedIndexAccessor[] mutableInvIndexAccessors;
+ // Accessor to the deleted-keys BTrees.
+ public IIndexAccessor[] deletedKeysBTreeAccessors;
+
+ public IInvertedIndexAccessor currentMutableInvIndexAccessors;
+ public IIndexAccessor currentDeletedKeysBTreeAccessors;
+
+ public final PermutingTupleReference indexTuple;
+ public final MultiComparator filterCmp;
+ public final PermutingTupleReference filterTuple;
+
+ public ISearchPredicate searchPredicate;
+
+ public LSMInvertedIndexOpContext(List<ILSMComponent> mutableComponents,
+ IModificationOperationCallback modificationCallback, ISearchOperationCallback searchCallback,
+ int[] invertedIndexFields, int[] filterFields) throws HyracksDataException {
+ this.componentHolder = new LinkedList<ILSMComponent>();
+ this.componentsToBeMerged = new LinkedList<ILSMComponent>();
+ this.componentsToBeReplicated = new LinkedList<ILSMComponent>();
+ this.modificationCallback = modificationCallback;
+ this.searchCallback = searchCallback;
+
+ mutableInvIndexAccessors = new IInvertedIndexAccessor[mutableComponents.size()];
+ deletedKeysBTreeAccessors = new IIndexAccessor[mutableComponents.size()];
+
+ for (int i = 0; i < mutableComponents.size(); i++) {
+ LSMInvertedIndexMemoryComponent mutableComponent = (LSMInvertedIndexMemoryComponent) mutableComponents
+ .get(i);
+ mutableInvIndexAccessors[i] = (IInvertedIndexAccessor) mutableComponent.getInvIndex()
+ .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ deletedKeysBTreeAccessors[i] = mutableComponent.getDeletedKeysBTree()
+ .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ }
+
+ assert mutableComponents.size() > 0;
+
+ // Project away the document fields, leaving only the key fields.
+ LSMInvertedIndexMemoryComponent c = (LSMInvertedIndexMemoryComponent) mutableComponents.get(0);
+ int numKeyFields = c.getInvIndex().getInvListTypeTraits().length;
+ int[] keyFieldPermutation = new int[numKeyFields];
+ for (int i = 0; i < numKeyFields; i++) {
+ keyFieldPermutation[i] = NUM_DOCUMENT_FIELDS + i;
+ }
+ keysOnlyTuple = new PermutingTupleReference(keyFieldPermutation);
+
+ if (filterFields != null) {
+ indexTuple = new PermutingTupleReference(invertedIndexFields);
+ filterCmp = MultiComparator.create(c.getLSMComponentFilter().getFilterCmpFactories());
+ filterTuple = new PermutingTupleReference(filterFields);
+ } else {
+ indexTuple = null;
+ filterCmp = null;
+ filterTuple = null;
+ }
+ }
+
+ @Override
+ public void reset() {
++ super.reset();
+ componentHolder.clear();
+ componentsToBeMerged.clear();
+ componentsToBeReplicated.clear();
+ }
+
+ @Override
+ // TODO: Ignore opcallback for now.
+ public void setOperation(IndexOperation newOp) throws HyracksDataException {
+ reset();
+ op = newOp;
+ }
+
+ @Override
+ public IndexOperation getOperation() {
+ return op;
+ }
+
+ @Override
+ public List<ILSMComponent> getComponentHolder() {
+ return componentHolder;
+ }
+
+ @Override
+ public ISearchOperationCallback getSearchOperationCallback() {
+ return searchCallback;
+ }
+
+ @Override
+ public IModificationOperationCallback getModificationCallback() {
+ return modificationCallback;
+ }
+
+ @Override
+ public void setCurrentMutableComponentId(int currentMutableComponentId) {
+ currentMutableInvIndexAccessors = mutableInvIndexAccessors[currentMutableComponentId];
+ currentDeletedKeysBTreeAccessors = deletedKeysBTreeAccessors[currentMutableComponentId];
+ }
+
+ @Override
+ public List<ILSMComponent> getComponentsToBeMerged() {
+ return componentsToBeMerged;
+ }
+
+ @Override
+ public void setSearchPredicate(ISearchPredicate searchPredicate) {
+ this.searchPredicate = searchPredicate;
+ }
+
+ @Override
+ public ISearchPredicate getSearchPredicate() {
+ return searchPredicate;
+ }
+
+ @Override
+ public List<ILSMComponent> getComponentsToBeReplicated() {
+ return componentsToBeReplicated;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e928b6ac/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTreeOpContext.java
----------------------------------------------------------------------
diff --cc hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTreeOpContext.java
index 6a9a640,0000000..358a42a
mode 100644,000000..100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTreeOpContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/ExternalRTreeOpContext.java
@@@ -1,134 -1,0 +1,135 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.lsm.rtree.impls;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
+import org.apache.hyracks.storage.am.common.api.ISearchPredicate;
+import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
- import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
++import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexOperationContext;
+
- public class ExternalRTreeOpContext implements ILSMIndexOperationContext {
++public class ExternalRTreeOpContext extends AbstractLSMIndexOperationContext {
+ private IndexOperation op;
+ private MultiComparator bTreeCmp;
+ private MultiComparator rTreeCmp;
+ public final List<ILSMComponent> componentHolder;
+ private final List<ILSMComponent> componentsToBeMerged;
+ private final List<ILSMComponent> componentsToBeReplicated;
+ public final ISearchOperationCallback searchCallback;
+ private final int targetIndexVersion;
+ public ISearchPredicate searchPredicate;
+ public LSMRTreeCursorInitialState initialState;
+
+ public ExternalRTreeOpContext(IBinaryComparatorFactory[] rtreeCmpFactories,
+ IBinaryComparatorFactory[] btreeCmpFactories, ISearchOperationCallback searchCallback,
+ int targetIndexVersion, ILSMHarness lsmHarness, int[] comparatorFields,
+ IBinaryComparatorFactory[] linearizerArray, ITreeIndexFrameFactory rtreeLeafFrameFactory,
+ ITreeIndexFrameFactory rtreeInteriorFrameFactory, ITreeIndexFrameFactory btreeLeafFrameFactory) {
+ this.componentHolder = new LinkedList<ILSMComponent>();
+ this.componentsToBeMerged = new LinkedList<ILSMComponent>();
+ this.componentsToBeReplicated = new LinkedList<ILSMComponent>();
+ this.searchCallback = searchCallback;
+ this.targetIndexVersion = targetIndexVersion;
+ this.bTreeCmp = MultiComparator.create(btreeCmpFactories);
+ this.rTreeCmp = MultiComparator.create(rtreeCmpFactories);
+ initialState = new LSMRTreeCursorInitialState(rtreeLeafFrameFactory, rtreeInteriorFrameFactory,
+ btreeLeafFrameFactory, bTreeCmp, lsmHarness, comparatorFields, linearizerArray, searchCallback,
+ componentHolder);
+ }
+
+ @Override
+ public void setOperation(IndexOperation newOp) {
+ reset();
+ this.op = newOp;
+ }
+
+ @Override
+ public void setCurrentMutableComponentId(int currentMutableComponentId) {
+ // Do nothing. this should never be called for disk only indexes
+ }
+
+ @Override
+ public void reset() {
++ super.reset();
+ componentHolder.clear();
+ componentsToBeMerged.clear();
+ componentsToBeReplicated.clear();
+ }
+
+ @Override
+ public IndexOperation getOperation() {
+ return op;
+ }
+
+ public MultiComparator getBTreeMultiComparator() {
+ return bTreeCmp;
+ }
+
+ public MultiComparator getRTreeMultiComparator() {
+ return rTreeCmp;
+ }
+
+ @Override
+ public List<ILSMComponent> getComponentHolder() {
+ return componentHolder;
+ }
+
+ @Override
+ public ISearchOperationCallback getSearchOperationCallback() {
+ return searchCallback;
+ }
+
+ // This should never be needed for disk only indexes
+ @Override
+ public IModificationOperationCallback getModificationCallback() {
+ return null;
+ }
+
+ @Override
+ public List<ILSMComponent> getComponentsToBeMerged() {
+ return componentsToBeMerged;
+ }
+
+ public int getTargetIndexVersion() {
+ return targetIndexVersion;
+ }
+
+ @Override
+ public void setSearchPredicate(ISearchPredicate searchPredicate) {
+ this.searchPredicate = searchPredicate;
+ }
+
+ @Override
+ public ISearchPredicate getSearchPredicate() {
+ return searchPredicate;
+ }
+
+ @Override
+ public List<ILSMComponent> getComponentsToBeReplicated() {
+ return componentsToBeReplicated;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/e928b6ac/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
----------------------------------------------------------------------
diff --cc hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
index 686cd2b,0000000..62f572f
mode 100644,000000..100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeOpContext.java
@@@ -1,181 -1,0 +1,182 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.lsm.rtree.impls;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.storage.am.btree.impls.BTree;
+import org.apache.hyracks.storage.am.btree.impls.BTreeOpContext;
+import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
+import org.apache.hyracks.storage.am.common.api.ISearchOperationCallback;
+import org.apache.hyracks.storage.am.common.api.ISearchPredicate;
+import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
+import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.common.ophelpers.MultiComparator;
+import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
- import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
++import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexOperationContext;
+import org.apache.hyracks.storage.am.rtree.impls.RTree;
+import org.apache.hyracks.storage.am.rtree.impls.RTreeOpContext;
+
- public final class LSMRTreeOpContext implements ILSMIndexOperationContext {
++public final class LSMRTreeOpContext extends AbstractLSMIndexOperationContext {
+
+ public RTree.RTreeAccessor[] mutableRTreeAccessors;
+ public RTree.RTreeAccessor currentMutableRTreeAccessor;
+ public BTree.BTreeAccessor[] mutableBTreeAccessors;
+ public BTree.BTreeAccessor currentMutableBTreeAccessor;
+
+ public RTreeOpContext[] rtreeOpContexts;
+ public BTreeOpContext[] btreeOpContexts;
+ public RTreeOpContext currentRTreeOpContext;
+ public BTreeOpContext currentBTreeOpContext;
+
+ private IndexOperation op;
+ public final List<ILSMComponent> componentHolder;
+ private final List<ILSMComponent> componentsToBeMerged;
+ private final List<ILSMComponent> componentsToBeReplicated;
+ private IModificationOperationCallback modificationCallback;
+ private ISearchOperationCallback searchCallback;
+ public final PermutingTupleReference indexTuple;
+ public final MultiComparator filterCmp;
+ public final PermutingTupleReference filterTuple;
+ public ISearchPredicate searchPredicate;
+ public LSMRTreeCursorInitialState searchInitialState;
+
+ public LSMRTreeOpContext(List<ILSMComponent> mutableComponents, ITreeIndexFrameFactory rtreeLeafFrameFactory,
+ ITreeIndexFrameFactory rtreeInteriorFrameFactory, ITreeIndexFrameFactory btreeLeafFrameFactory,
+ ITreeIndexFrameFactory btreeInteriorFrameFactory, IBinaryComparatorFactory[] rtreeCmpFactories,
+ IBinaryComparatorFactory[] btreeCmpFactories, IModificationOperationCallback modificationCallback,
+ ISearchOperationCallback searchCallback, int[] rtreeFields, int[] filterFields, ILSMHarness lsmHarness,
+ int[] comparatorFields, IBinaryComparatorFactory[] linearizerArray) {
+ mutableRTreeAccessors = new RTree.RTreeAccessor[mutableComponents.size()];
+ mutableBTreeAccessors = new BTree.BTreeAccessor[mutableComponents.size()];
+ rtreeOpContexts = new RTreeOpContext[mutableComponents.size()];
+ btreeOpContexts = new BTreeOpContext[mutableComponents.size()];
+
+ LSMRTreeMemoryComponent c = (LSMRTreeMemoryComponent) mutableComponents.get(0);
+
+ for (int i = 0; i < mutableComponents.size(); i++) {
+ LSMRTreeMemoryComponent mutableComponent = (LSMRTreeMemoryComponent) mutableComponents.get(i);
+ mutableRTreeAccessors[i] = (RTree.RTreeAccessor) mutableComponent.getRTree()
+ .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+ mutableBTreeAccessors[i] = (BTree.BTreeAccessor) mutableComponent.getBTree()
+ .createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
+
+ rtreeOpContexts[i] = mutableRTreeAccessors[i].getOpContext();
+ btreeOpContexts[i] = mutableBTreeAccessors[i].getOpContext();
+ }
+
+ assert mutableComponents.size() > 0;
+ currentRTreeOpContext = rtreeOpContexts[0];
+ currentBTreeOpContext = btreeOpContexts[0];
+ this.componentHolder = new LinkedList<ILSMComponent>();
+ this.componentsToBeMerged = new LinkedList<ILSMComponent>();
+ this.componentsToBeReplicated = new LinkedList<ILSMComponent>();
+ this.modificationCallback = modificationCallback;
+ this.searchCallback = searchCallback;
+
+ if (filterFields != null) {
+ indexTuple = new PermutingTupleReference(rtreeFields);
+ filterCmp = MultiComparator.create(c.getLSMComponentFilter().getFilterCmpFactories());
+ filterTuple = new PermutingTupleReference(filterFields);
+ } else {
+ indexTuple = null;
+ filterCmp = null;
+ filterTuple = null;
+ }
+ searchInitialState = new LSMRTreeCursorInitialState(rtreeLeafFrameFactory, rtreeInteriorFrameFactory,
+ btreeLeafFrameFactory, getBTreeMultiComparator(), lsmHarness, comparatorFields, linearizerArray,
+ searchCallback, componentHolder);
+ }
+
+ @Override
+ public void setOperation(IndexOperation newOp) {
+ reset();
+ this.op = newOp;
+ }
+
+ @Override
+ public void setCurrentMutableComponentId(int currentMutableComponentId) {
+ currentMutableRTreeAccessor = mutableRTreeAccessors[currentMutableComponentId];
+ currentMutableBTreeAccessor = mutableBTreeAccessors[currentMutableComponentId];
+ currentRTreeOpContext = rtreeOpContexts[currentMutableComponentId];
+ currentBTreeOpContext = btreeOpContexts[currentMutableComponentId];
+ if (op == IndexOperation.INSERT) {
+ currentRTreeOpContext.setOperation(op);
+ } else if (op == IndexOperation.DELETE) {
+ currentBTreeOpContext.setOperation(IndexOperation.INSERT);
+ }
+ }
+
+ @Override
+ public void reset() {
++ super.reset();
+ componentHolder.clear();
+ componentsToBeMerged.clear();
+ }
+
+ @Override
+ public IndexOperation getOperation() {
+ return op;
+ }
+
+ public MultiComparator getBTreeMultiComparator() {
+ return currentBTreeOpContext.cmp;
+ }
+
+ @Override
+ public List<ILSMComponent> getComponentHolder() {
+ return componentHolder;
+ }
+
+ @Override
+ public ISearchOperationCallback getSearchOperationCallback() {
+ return searchCallback;
+ }
+
+ @Override
+ public IModificationOperationCallback getModificationCallback() {
+ return modificationCallback;
+ }
+
+ @Override
+ public List<ILSMComponent> getComponentsToBeMerged() {
+ return componentsToBeMerged;
+ }
+
+ @Override
+ public void setSearchPredicate(ISearchPredicate searchPredicate) {
+ this.searchPredicate = searchPredicate;
+ }
+
+ @Override
+ public ISearchPredicate getSearchPredicate() {
+ return searchPredicate;
+ }
+
+ @Override
+ public List<ILSMComponent> getComponentsToBeReplicated() {
+ return componentsToBeReplicated;
+ }
+}