You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/11/24 06:30:15 UTC
[11/12] asterixdb git commit: Cleanup FileSplit and FileReference
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/163c3be3/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index 615e8af..5816450 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -25,9 +25,8 @@ import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartit
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -47,12 +46,11 @@ public class StoragePathUtil {
loc[p] = splits[p].getNodeName();
}
AlgebricksPartitionConstraint pc = new AlgebricksAbsolutePartitionConstraint(loc);
- return new Pair<IFileSplitProvider, AlgebricksPartitionConstraint>(splitProvider, pc);
+ return new Pair<>(splitProvider, pc);
}
- public static FileSplit getFileSplitForClusterPartition(ClusterPartition partition, File relativeFile) {
- return new FileSplit(partition.getActiveNodeId(), new FileReference(relativeFile), partition.getIODeviceNum(),
- partition.getPartitionId());
+ public static FileSplit getFileSplitForClusterPartition(ClusterPartition partition, String relativePath) {
+ return new FileSplit(partition.getActiveNodeId(), relativePath, true);
}
public static String prepareStoragePartitionPath(String storageDirName, int partitonId) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/163c3be3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index 275825b..4853129 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -47,7 +47,7 @@ import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartit
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
+import org.apache.hyracks.api.io.FileSplit;
public class GenericAdapterFactory implements IIndexingAdapterFactory, IAdapterFactory {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/163c3be3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFileIndexAccessor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFileIndexAccessor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFileIndexAccessor.java
index 667bae7..42e11b1 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFileIndexAccessor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/indexing/ExternalFileIndexAccessor.java
@@ -92,7 +92,7 @@ public class ExternalFileIndexAccessor implements Serializable {
// create the accessor and the cursor using the passed version
ISearchOperationCallback searchCallback = opDesc.getSearchOpCallbackFactory()
- .createSearchOperationCallback(indexDataflowHelper.getResourceID(), ctx, null);
+ .createSearchOperationCallback(indexDataflowHelper.getResource().getId(), ctx, null);
fileIndexAccessor = index.createAccessor(searchCallback, indexDataflowHelper.getVersion());
fileIndexSearchCursor = fileIndexAccessor.createSearchCursor(false);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/163c3be3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
index ae012f3..7aeb0bc 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/factory/LocalFSInputStreamFactory.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.external.input.stream.factory;
-import java.io.File;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Map;
@@ -40,8 +39,7 @@ import org.apache.asterix.external.util.NodeResolverFactory;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileReference;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
+import org.apache.hyracks.api.io.FileSplit;
public class LocalFSInputStreamFactory implements IInputStreamFactory {
@@ -66,7 +64,7 @@ public class LocalFSInputStreamFactory implements IInputStreamFactory {
ArrayList<Path> inputResources = new ArrayList<>();
for (int i = 0; i < inputFileSplits.length; i++) {
if (inputFileSplits[i].getNodeName().equals(nodeName)) {
- inputResources.add(inputFileSplits[i].getLocalFile().getFile().toPath());
+ inputResources.add(inputFileSplits[i].getFile(ctx.getIOManager()).toPath());
}
}
watcher = new FileSystemWatcher(inputResources, expression, isFeed);
@@ -115,7 +113,7 @@ public class LocalFSInputStreamFactory implements IInputStreamFactory {
}
nodeName = resolver.resolveNode(trimmedValue.split(":")[0]);
nodeLocalPath = trimmedValue.split("://")[1];
- FileSplit fileSplit = new FileSplit(nodeName, new FileReference(new File(nodeLocalPath)));
+ FileSplit fileSplit = new FileSplit(nodeName, nodeLocalPath, false);
inputFileSplits[count++] = fileSplit;
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/163c3be3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractExternalDatasetIndexesOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractExternalDatasetIndexesOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractExternalDatasetIndexesOperatorDescriptor.java
index 9b37722..078e4f8 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractExternalDatasetIndexesOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/AbstractExternalDatasetIndexesOperatorDescriptor.java
@@ -26,6 +26,8 @@ import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
import org.apache.hyracks.dataflow.std.base.AbstractOperatorNodePushable;
import org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
@@ -78,8 +80,10 @@ public abstract class AbstractExternalDatasetIndexesOperatorDescriptor
@Override
public void initialize() throws HyracksDataException {
try {
+ FileSplit fileSplit = fileIndexInfo.getFileSplitProvider().getFileSplits()[partition];
+ FileReference fileRef = fileSplit.getFileRef(ctx.getIOManager());
// only in partition of device id = 0, we perform the operation on the files index
- if (fileIndexInfo.getFileSplitProvider().getFileSplits()[partition].getIODeviceId() == 0) {
+ if (fileRef.getDeviceHandle() == ctx.getIOManager().getIODevices().get(0)) {
performOpOnIndex(filesIndexDataflowHelperFactory, ctx, fileIndexInfo, partition);
}
// perform operation on btrees
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/163c3be3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorDescriptor.java
index c1903e0..8202316 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorDescriptor.java
@@ -25,6 +25,7 @@ import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
@@ -52,7 +53,7 @@ public class ExternalBTreeSearchOperatorDescriptor extends BTreeSearchOperatorDe
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
return new ExternalBTreeSearchOperatorNodePushable(this, ctx, partition, recordDescProvider, lowKeyFields,
highKeyFields, lowKeyInclusive, highKeyInclusive);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/163c3be3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorNodePushable.java
index 9b83a12..69a2020 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalBTreeSearchOperatorNodePushable.java
@@ -38,7 +38,7 @@ public class ExternalBTreeSearchOperatorNodePushable extends BTreeSearchOperator
public ExternalBTreeSearchOperatorNodePushable(ExternalBTreeSearchOperatorDescriptor opDesc,
IHyracksTaskContext ctx, int partition, IRecordDescriptorProvider recordDescProvider, int[] lowKeyFields,
- int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive) {
+ int[] highKeyFields, boolean lowKeyInclusive, boolean highKeyInclusive) throws HyracksDataException {
super(opDesc, ctx, partition, recordDescProvider, lowKeyFields, highKeyFields, lowKeyInclusive,
highKeyInclusive, null, null);
}
@@ -73,7 +73,7 @@ public class ExternalBTreeSearchOperatorNodePushable extends BTreeSearchOperator
dos = tb.getDataOutput();
appender = new FrameTupleAppender(new VSizeFrame(ctx));
ISearchOperationCallback searchCallback = opDesc.getSearchOpCallbackFactory()
- .createSearchOperationCallback(indexHelper.getResourceID(), ctx, null);
+ .createSearchOperationCallback(indexHelper.getResource().getId(), ctx, null);
// The next line is the reason we override this method
indexAccessor = externalIndex.createAccessor(searchCallback, dataFlowHelper.getTargetVersion());
cursor = createCursor();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/163c3be3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java
index 5255257..8321074 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDatasetIndexesAbortOperatorDescriptor.java
@@ -49,7 +49,7 @@ public class ExternalDatasetIndexesAbortOperatorDescriptor extends AbstractExter
protected void performOpOnIndex(IIndexDataflowHelperFactory indexDataflowHelperFactory, IHyracksTaskContext ctx,
IndexInfoOperatorDescriptor fileIndexInfo, int partition) throws Exception {
FileReference file = IndexFileNameUtil.getIndexAbsoluteFileRef(fileIndexInfo, partition, ctx.getIOManager());
- AbortRecoverLSMIndexFileManager fileManager = new AbortRecoverLSMIndexFileManager(file);
+ AbortRecoverLSMIndexFileManager fileManager = new AbortRecoverLSMIndexFileManager(ctx.getIOManager(), file);
fileManager.deleteTransactionFiles();
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/163c3be3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java
index 82ca715..0312eac 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalDatasetIndexesRecoverOperatorDescriptor.java
@@ -49,7 +49,7 @@ public class ExternalDatasetIndexesRecoverOperatorDescriptor extends AbstractExt
protected void performOpOnIndex(IIndexDataflowHelperFactory indexDataflowHelperFactory, IHyracksTaskContext ctx,
IndexInfoOperatorDescriptor fileIndexInfo, int partition) throws Exception {
FileReference file = IndexFileNameUtil.getIndexAbsoluteFileRef(fileIndexInfo, partition, ctx.getIOManager());
- AbortRecoverLSMIndexFileManager fileManager = new AbortRecoverLSMIndexFileManager(file);
+ AbortRecoverLSMIndexFileManager fileManager = new AbortRecoverLSMIndexFileManager(ctx.getIOManager(), file);
fileManager.recoverTransaction();
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/163c3be3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
index 5748a65..c95a4a7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
@@ -45,7 +45,7 @@ public class ExternalIndexBulkModifyOperatorNodePushable extends IndexBulkLoadOp
public ExternalIndexBulkModifyOperatorNodePushable(ExternalIndexBulkModifyOperatorDescriptor opDesc,
IHyracksTaskContext ctx, int partition, int[] fieldPermutation, float fillFactor, long numElementsHint,
- IRecordDescriptorProvider recordDescProvider, int[] deletedFiles) {
+ IRecordDescriptorProvider recordDescProvider, int[] deletedFiles) throws HyracksDataException {
super(opDesc, ctx, partition, fieldPermutation, fillFactor, false, numElementsHint, false, recordDescProvider);
this.deletedFiles = deletedFiles;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/163c3be3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorDescriptor.java
index e217365..c0907ee 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorDescriptor.java
@@ -25,6 +25,7 @@ import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
@@ -50,7 +51,7 @@ public class ExternalRTreeSearchOperatorDescriptor extends RTreeSearchOperatorDe
@Override
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
return new ExternalRTreeSearchOperatorNodePushable(this, ctx, partition, recordDescProvider, keyFields);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/163c3be3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorNodePushable.java
index cda2985..97a38747 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalRTreeSearchOperatorNodePushable.java
@@ -38,7 +38,7 @@ import org.apache.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorNodePusha
public class ExternalRTreeSearchOperatorNodePushable extends RTreeSearchOperatorNodePushable {
public ExternalRTreeSearchOperatorNodePushable(AbstractTreeIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
- int partition, IRecordDescriptorProvider recordDescProvider, int[] keyFields) {
+ int partition, IRecordDescriptorProvider recordDescProvider, int[] keyFields) throws HyracksDataException {
super(opDesc, ctx, partition, recordDescProvider, keyFields, null, null);
}
@@ -72,7 +72,7 @@ public class ExternalRTreeSearchOperatorNodePushable extends RTreeSearchOperator
dos = tb.getDataOutput();
appender = new FrameTupleAppender(new VSizeFrame(ctx));
ISearchOperationCallback searchCallback = opDesc.getSearchOpCallbackFactory()
- .createSearchOperationCallback(indexHelper.getResourceID(), ctx, null);
+ .createSearchOperationCallback(indexHelper.getResource().getId(), ctx, null);
// The next line is the reason we override this method
indexAccessor = rTreeIndex.createAccessor(searchCallback, rTreeDataflowHelper.getTargetVersion());
cursor = createCursor();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/163c3be3/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
index 366534f..303f76d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
@@ -37,9 +37,9 @@ import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
import org.apache.hyracks.util.IntSerDeUtils;
public class FeedUtils {
@@ -82,7 +82,7 @@ public class FeedUtils {
// Note: feed adapter instances in a single node share the feed logger
// format: 'storage dir name'/partition_#/dataverse/feed/node
File f = new File(storagePartitionPath + File.separator + relPathFile + File.separator + nodeName);
- return StoragePathUtil.getFileSplitForClusterPartition(partition, f);
+ return StoragePathUtil.getFileSplitForClusterPartition(partition, f.getPath());
}
public static FileSplit[] splitsForAdapter(String dataverseName, String feedName,
@@ -100,20 +100,19 @@ public class FeedUtils {
}
public static FileReference getAbsoluteFileRef(String relativePath, int ioDeviceId, IIOManager ioManager) {
- return ioManager.getAbsoluteFileRef(ioDeviceId, relativePath);
+ return ioManager.getFileRef(ioDeviceId, relativePath);
}
public static FeedLogManager getFeedLogManager(IHyracksTaskContext ctx, int partition,
FileSplit[] feedLogFileSplits) throws HyracksDataException {
return new FeedLogManager(
- FeedUtils.getAbsoluteFileRef(feedLogFileSplits[partition].getLocalFile().getFile().getPath(),
- feedLogFileSplits[partition].getIODeviceId(), ctx.getIOManager()).getFile());
+ FeedUtils.getAbsoluteFileRef(feedLogFileSplits[partition].getPath(), 0, ctx.getIOManager()).getFile());
}
public static FeedLogManager getFeedLogManager(IHyracksTaskContext ctx, FileSplit feedLogFileSplit)
throws HyracksDataException {
- return new FeedLogManager(FeedUtils.getAbsoluteFileRef(feedLogFileSplit.getLocalFile().getFile().getPath(),
- feedLogFileSplit.getIODeviceId(), ctx.getIOManager()).getFile());
+ return new FeedLogManager(FeedUtils.getAbsoluteFileRef(feedLogFileSplit.getPath(),
+ 0, ctx.getIOManager()).getFile());
}
public static void processFeedMessage(ByteBuffer input, VSizeFrame message, FrameTupleAccessor fta)
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/163c3be3/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
index 6de796e..7d0d91c 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java
@@ -406,7 +406,7 @@ public class MetadataNode implements IMetadataNode {
private void insertTupleIntoIndex(JobId jobId, IMetadataIndex metadataIndex, ITupleReference tuple)
throws ACIDException, HyracksDataException, IndexException {
long resourceID = metadataIndex.getResourceID();
- String resourceName = metadataIndex.getFile().toString();
+ String resourceName = metadataIndex.getFile().getRelativePath();
ILSMIndex lsmIndex = (ILSMIndex) datasetLifecycleManager.get(resourceName);
try {
datasetLifecycleManager.open(resourceName);
@@ -673,7 +673,7 @@ public class MetadataNode implements IMetadataNode {
private void deleteTupleFromIndex(JobId jobId, IMetadataIndex metadataIndex, ITupleReference tuple)
throws ACIDException, HyracksDataException, IndexException {
long resourceID = metadataIndex.getResourceID();
- String resourceName = metadataIndex.getFile().toString();
+ String resourceName = metadataIndex.getFile().getRelativePath();
ILSMIndex lsmIndex = (ILSMIndex) datasetLifecycleManager.get(resourceName);
try {
datasetLifecycleManager.open(resourceName);
@@ -1148,7 +1148,7 @@ public class MetadataNode implements IMetadataNode {
if (index.getFile() == null) {
throw new MetadataException("No file for Index " + index.getDataverseName() + "." + index.getIndexName());
}
- String resourceName = index.getFile().toString();
+ String resourceName = index.getFile().getRelativePath();
IIndex indexInstance = datasetLifecycleManager.get(resourceName);
datasetLifecycleManager.open(resourceName);
IIndexAccessor indexAccessor = indexInstance.createAccessor(NoOpOperationCallback.INSTANCE,
@@ -1186,7 +1186,7 @@ public class MetadataNode implements IMetadataNode {
public void initializeDatasetIdFactory(JobId jobId) throws MetadataException, RemoteException {
int mostRecentDatasetId = MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID;
try {
- String resourceName = MetadataPrimaryIndexes.DATASET_DATASET.getFile().toString();
+ String resourceName = MetadataPrimaryIndexes.DATASET_DATASET.getFile().getRelativePath();
IIndex indexInstance = datasetLifecycleManager.get(resourceName);
datasetLifecycleManager.open(resourceName);
try {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/163c3be3/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 0990998..17ef709 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -28,7 +28,6 @@ import java.util.logging.Logger;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
-import org.apache.asterix.common.api.ILocalResourceMetadata;
import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.config.AsterixMetadataProperties;
import org.apache.asterix.common.config.ClusterProperties;
@@ -39,6 +38,7 @@ import org.apache.asterix.common.context.BaseOperationTracker;
import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import org.apache.asterix.common.transactions.Resource;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.external.adapter.factory.GenericAdapterFactory;
import org.apache.asterix.external.api.IAdapterFactory;
@@ -341,7 +341,7 @@ public class MetadataBootstrap {
String metadataPartitionPath = StoragePathUtil.prepareStoragePartitionPath(
ClusterProperties.INSTANCE.getStorageDirectoryName(), metadataPartition.getPartitionId());
String resourceName = metadataPartitionPath + File.separator + index.getFileNameRelativePath();
- FileReference file = ioManager.getAbsoluteFileRef(metadataDeviceId, resourceName);
+ FileReference file = ioManager.getFileRef(metadataDeviceId, resourceName);
// this should not be done this way. dataset lifecycle manager shouldn't return virtual buffer caches for
// a dataset that was not yet created
@@ -356,9 +356,9 @@ public class MetadataBootstrap {
index.isPrimaryIndex() ? runtimeContext.getLSMBTreeOperationTracker(index.getDatasetId().getId())
: new BaseOperationTracker(index.getDatasetId().getId(),
dataLifecycleManager.getDatasetInfo(index.getDatasetId().getId()));
- final String absolutePath = file.getFile().getPath();
if (isNewUniverse()) {
- lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, bufferCache, fileMapProvider, typeTraits,
+ lsmBtree = LSMBTreeUtils.createLSMTree(ioManager, virtualBufferCaches, file, bufferCache, fileMapProvider,
+ typeTraits,
comparatorFactories, bloomFilterKeyFields, runtimeContext.getBloomFilterFalsePositiveRate(),
runtimeContext.getMetadataMergePolicyFactory()
.createMergePolicy(GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, dataLifecycleManager),
@@ -367,28 +367,32 @@ public class MetadataBootstrap {
null, null, null, null, true);
lsmBtree.create();
resourceID = index.getResourceID();
- ILocalResourceMetadata localResourceMetadata = new LSMBTreeLocalResourceMetadata(typeTraits,
+ Resource localResourceMetadata = new LSMBTreeLocalResourceMetadata(typeTraits,
comparatorFactories, bloomFilterKeyFields, index.isPrimaryIndex(), index.getDatasetId().getId(),
+ metadataPartition.getPartitionId(),
runtimeContext.getMetadataMergePolicyFactory(), GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES,
null, null, null, null);
ILocalResourceFactoryProvider localResourceFactoryProvider =
- new PersistentLocalResourceFactoryProvider(localResourceMetadata, LocalResource.LSMBTreeResource);
+ new PersistentLocalResourceFactoryProvider(partition -> localResourceMetadata,
+ LocalResource.LSMBTreeResource);
ILocalResourceFactory localResourceFactory = localResourceFactoryProvider.getLocalResourceFactory();
localResourceRepository.insert(localResourceFactory.createLocalResource(resourceID, resourceName,
- metadataPartition.getPartitionId(), LIFOMetaDataFrame.VERSION, absolutePath));
- dataLifecycleManager.register(absolutePath, lsmBtree);
+ LIFOMetaDataFrame.VERSION, metadataPartition.getPartitionId()));
+ dataLifecycleManager.register(file.getRelativePath(), lsmBtree);
} else {
- final LocalResource resource = localResourceRepository.getResourceByPath(absolutePath);
+ final LocalResource resource = localResourceRepository.get(file.getRelativePath());
if (resource == null) {
throw new HyracksDataException("Could not find required metadata indexes. Please delete "
+ propertiesProvider.getMetadataProperties().getTransactionLogDirs()
.get(runtimeContext.getTransactionSubsystem().getId())
+ " to intialize as a new instance. (WARNING: all data will be lost.)");
}
- resourceID = resource.getResourceId();
- lsmBtree = (LSMBTree) dataLifecycleManager.get(absolutePath);
+ resourceID = resource.getId();
+ assert (index.getResourceID() == resource.getId());
+ lsmBtree = (LSMBTree) dataLifecycleManager.get(file.getRelativePath());
if (lsmBtree == null) {
- lsmBtree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, bufferCache, fileMapProvider,
+ lsmBtree = LSMBTreeUtils.createLSMTree(ioManager, virtualBufferCaches, file, bufferCache,
+ fileMapProvider,
typeTraits, comparatorFactories, bloomFilterKeyFields,
runtimeContext.getBloomFilterFalsePositiveRate(),
runtimeContext.getMetadataMergePolicyFactory().createMergePolicy(
@@ -396,7 +400,7 @@ public class MetadataBootstrap {
opTracker, runtimeContext.getLSMIOScheduler(),
LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), index.isPrimaryIndex(),
null, null, null, null, true);
- dataLifecycleManager.register(absolutePath, lsmBtree);
+ dataLifecycleManager.register(file.getRelativePath(), lsmBtree);
}
}
index.setResourceID(resourceID);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/163c3be3/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FileSplitSinkId.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FileSplitSinkId.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FileSplitSinkId.java
index d4fe646..ba40430 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FileSplitSinkId.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FileSplitSinkId.java
@@ -19,7 +19,7 @@
package org.apache.asterix.metadata.declared;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
+import org.apache.hyracks.api.io.FileSplit;
public class FileSplitSinkId {
@@ -31,7 +31,7 @@ public class FileSplitSinkId {
@Override
public String toString() {
- return fileSplit.getNodeName() + ":" + fileSplit.getLocalFile();
+ return fileSplit.getNodeName() + ":" + fileSplit.getPath();
}
public FileSplit getFileSplit() {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/163c3be3/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 7258be8..3ec10f0 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -138,11 +138,11 @@ import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.dataset.ResultSetId;
+import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
import org.apache.hyracks.data.std.primitive.ShortPointable;
import org.apache.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
@@ -723,7 +723,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String>
FileSplitDataSink fsds = (FileSplitDataSink) sink;
FileSplitSinkId fssi = fsds.getId();
FileSplit fs = fssi.getFileSplit();
- File outFile = fs.getLocalFile().getFile();
+ File outFile = new File(fs.getPath());
String nodeId = fs.getNodeName();
SinkWriterRuntimeFactory runtime = new SinkWriterRuntimeFactory(printColumns, printerFactories, outFile,
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/163c3be3/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
index 21e2150..24d185f 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
@@ -33,7 +33,7 @@ import org.apache.asterix.runtime.util.ClusterStateManager;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
-import org.apache.hyracks.dataflow.std.file.FileSplit;
+import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
public class SplitsAndConstraintsUtil {
@@ -48,13 +48,9 @@ public class SplitsAndConstraintsUtil {
ClusterPartition[] clusterPartition = ClusterStateManager.INSTANCE.getClusterPartitons();
String storageDirName = ClusterProperties.INSTANCE.getStorageDirectoryName();
for (int j = 0; j < clusterPartition.length; j++) {
- int nodePartitions =
- ClusterStateManager.INSTANCE.getNodePartitionsCount(clusterPartition[j].getNodeId());
- for (int i = 0; i < nodePartitions; i++) {
File f = new File(StoragePathUtil.prepareStoragePartitionPath(storageDirName,
- clusterPartition[i].getPartitionId()) + File.separator + relPathFile);
- splits.add(StoragePathUtil.getFileSplitForClusterPartition(clusterPartition[j], f));
- }
+ clusterPartition[j].getPartitionId()) + File.separator + relPathFile);
+ splits.add(StoragePathUtil.getFileSplitForClusterPartition(clusterPartition[j], f.getPath()));
}
return splits.toArray(new FileSplit[] {});
}
@@ -87,7 +83,7 @@ public class SplitsAndConstraintsUtil {
nodePartitions[k].getPartitionId())
+ (temp ? (File.separator + StoragePathUtil.TEMP_DATASETS_STORAGE_FOLDER) : "")
+ File.separator + relPathFile);
- splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartitions[k], f));
+ splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartitions[k], f.getPath()));
}
}
return splits.toArray(new FileSplit[] {});
@@ -118,12 +114,14 @@ public class SplitsAndConstraintsUtil {
// Only the first partition when create
File f = new File(StoragePathUtil.prepareStoragePartitionPath(storageDirName,
nodePartitions[firstPartition].getPartitionId()) + File.separator + relPathFile);
- splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartitions[firstPartition], f));
+ splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartitions[firstPartition], f
+ .getPath()));
} else {
for (int k = 0; k < nodePartitions.length; k++) {
File f = new File(StoragePathUtil.prepareStoragePartitionPath(storageDirName,
nodePartitions[firstPartition].getPartitionId()) + File.separator + relPathFile);
- splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartitions[firstPartition], f));
+ splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartitions[firstPartition], f
+ .getPath()));
}
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/163c3be3/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
index 6fc8fd5..d0a5ac6 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/ReplicaResourcesManager.java
@@ -166,7 +166,7 @@ public class ReplicaResourcesManager implements IReplicaResourcesManager {
File localResource = new File(
indexFolder + File.separator + PersistentLocalResourceRepository.METADATA_FILE_NAME);
LocalResource resource = PersistentLocalResourceRepository.readLocalResource(localResource);
- laggingReplicaIndexes.put(resource.getResourceId(), indexFolder.getAbsolutePath());
+ laggingReplicaIndexes.put(resource.getId(), indexFolder.getAbsolutePath());
}
}
} catch (HyracksDataException e) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/163c3be3/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
index c1319e0..b4bbe01 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ReportMaxResourceIdMessage.java
@@ -58,7 +58,7 @@ public class ReportMaxResourceIdMessage implements IApplicationMessage {
NodeControllerService ncs = cs;
IAsterixAppRuntimeContext appContext =
(IAsterixAppRuntimeContext) ncs.getApplicationContext().getApplicationObject();
- long maxResourceId = Math.max(appContext.getLocalResourceRepository().getMaxResourceID(),
+ long maxResourceId = Math.max(appContext.getLocalResourceRepository().maxId(),
MetadataIndexImmutableProperties.FIRST_AVAILABLE_USER_DATASET_ID);
ReportMaxResourceIdMessage maxResourceIdMsg = new ReportMaxResourceIdMessage(ncs.getId(), maxResourceId);
try {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/163c3be3/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMInvertedIndexUpsertOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMInvertedIndexUpsertOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMInvertedIndexUpsertOperatorDescriptor.java
index 3db3de2..50c5d0c 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMInvertedIndexUpsertOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMInvertedIndexUpsertOperatorDescriptor.java
@@ -25,6 +25,7 @@ import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
@@ -58,7 +59,7 @@ public class AsterixLSMInvertedIndexUpsertOperatorDescriptor
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
- IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) {
+ IRecordDescriptorProvider recordDescProvider, int partition, int nPartitions) throws HyracksDataException {
return new AsterixLSMSecondaryUpsertOperatorNodePushable(this, ctx, partition, fieldPermutation,
recordDescProvider, prevFieldPermutation);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/163c3be3/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
index 536366f..aad7222 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
@@ -86,7 +86,7 @@ public class AsterixLSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertU
public AsterixLSMPrimaryUpsertOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, int numOfPrimaryKeys,
- ARecordType recordType, int filterFieldIndex) {
+ ARecordType recordType, int filterFieldIndex) throws HyracksDataException {
super(opDesc, ctx, partition, fieldPermutation, recordDescProvider, IndexOperation.UPSERT);
this.key = new PermutingFrameTupleReference();
this.numOfPrimaryKeys = numOfPrimaryKeys;
@@ -141,10 +141,9 @@ public class AsterixLSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertU
dos = tb.getDataOutput();
appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
- indexHelper.getResourcePath(), indexHelper.getResourceID(), indexHelper.getResourcePartition(),
- index, ctx, this);
+ indexHelper.getResource(), ctx, this);
searchCallback = (LockThenSearchOperationCallback) opDesc.getSearchOpCallbackFactory()
- .createSearchOperationCallback(indexHelper.getResourceID(), ctx, this);
+ .createSearchOperationCallback(indexHelper.getResource().getId(), ctx, this);
indexAccessor = index.createAccessor(modCallback, searchCallback);
cursor = indexAccessor.createSearchCursor(false);
frameTuple = new FrameTupleReference();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/163c3be3/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMSecondaryUpsertOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMSecondaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMSecondaryUpsertOperatorNodePushable.java
index 05b633d..b800c24 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMSecondaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMSecondaryUpsertOperatorNodePushable.java
@@ -56,7 +56,7 @@ public class AsterixLSMSecondaryUpsertOperatorNodePushable extends LSMIndexInser
public AsterixLSMSecondaryUpsertOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider,
- int[] prevValuePermutation) {
+ int[] prevValuePermutation) throws HyracksDataException {
super(opDesc, ctx, partition, fieldPermutation, recordDescProvider, IndexOperation.UPSERT);
this.prevValueTuple.setFieldPermutation(prevValuePermutation);
this.numberOfFields = prevValuePermutation.length;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/163c3be3/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
index e5f3555..4cbb8cf 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java
@@ -26,14 +26,17 @@ import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.common.transactions.Resource;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
+import org.apache.hyracks.storage.am.common.api.IIndex;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.common.file.LocalResource;
/**
* Assumes LSM-BTrees as primary indexes.
@@ -54,24 +57,23 @@ public class PrimaryIndexModificationOperationCallbackFactory extends AbstractOp
}
@Override
- public IModificationOperationCallback createModificationOperationCallback(String resourcePath, long resourceId,
- int resourcePartition, Object resource, IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable)
- throws HyracksDataException {
-
+ public IModificationOperationCallback createModificationOperationCallback(LocalResource resource,
+ IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable) throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
- IResourceLifecycleManager indexLifeCycleManager =
+ IResourceLifecycleManager<IIndex> indexLifeCycleManager =
txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
- ILSMIndex index = (ILSMIndex) indexLifeCycleManager.get(resourcePath);
+ ILSMIndex index = (ILSMIndex) indexLifeCycleManager.get(resource.getPath());
if (index == null) {
- throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
+ throw new HyracksDataException("Index(id:" + resource.getId() + ") is not registered.");
}
try {
ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
+ Resource aResource = (Resource) resource.getResource();
IModificationOperationCallback modCallback = new PrimaryIndexModificationOperationCallback(datasetId,
- primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resourceId,
- resourcePartition, resourceType, indexOp, operatorNodePushable, logBeforeImage);
- txnCtx.registerIndexAndCallback(resourceId, index, (AbstractOperationCallback) modCallback, true);
+ primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resource.getId(),
+ aResource.partition(), resourceType, indexOp, operatorNodePushable, logBeforeImage);
+ txnCtx.registerIndexAndCallback(resource.getId(), index, (AbstractOperationCallback) modCallback, true);
return modCallback;
} catch (ACIDException e) {
throw new HyracksDataException(e);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/163c3be3/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index f9f2b89..3925bba 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -26,14 +26,16 @@ import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.common.transactions.Resource;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.common.file.LocalResource;
public class SecondaryIndexModificationOperationCallbackFactory extends AbstractOperationCallbackFactory
implements IModificationOperationCallbackFactory {
@@ -51,23 +53,23 @@ public class SecondaryIndexModificationOperationCallbackFactory extends Abstract
}
@Override
- public IModificationOperationCallback createModificationOperationCallback(String resourcePath, long resourceId,
- int resourcePartition, Object resource, IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable)
- throws HyracksDataException {
+ public IModificationOperationCallback createModificationOperationCallback(LocalResource resource,
+ IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable) throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
IResourceLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
.getDatasetLifecycleManager();
- ILSMIndex index = (ILSMIndex) indexLifeCycleManager.get(resourcePath);
+ ILSMIndex index = (ILSMIndex) indexLifeCycleManager.get(resource.getPath());
if (index == null) {
- throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
+ throw new HyracksDataException("Index(id:" + resource.getId() + ") is not registered.");
}
try {
ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
+ Resource aResource = (Resource) resource.getResource();
IModificationOperationCallback modCallback = new SecondaryIndexModificationOperationCallback(datasetId,
- primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resourceId,
- resourcePartition, resourceType, indexOp, logBeforeImage);
- txnCtx.registerIndexAndCallback(resourceId, index, (AbstractOperationCallback) modCallback, false);
+ primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resource.getId(),
+ aResource.partition(), resourceType, indexOp, logBeforeImage);
+ txnCtx.registerIndexAndCallback(resource.getId(), index, (AbstractOperationCallback) modCallback, false);
return modCallback;
} catch (ACIDException e) {
throw new HyracksDataException(e);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/163c3be3/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
index df7d65d..6e07ea3 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
@@ -26,14 +26,16 @@ import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.common.transactions.Resource;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.common.file.LocalResource;
public class TempDatasetPrimaryIndexModificationOperationCallbackFactory extends AbstractOperationCallbackFactory
implements IModificationOperationCallbackFactory {
@@ -49,23 +51,23 @@ public class TempDatasetPrimaryIndexModificationOperationCallbackFactory extends
}
@Override
- public IModificationOperationCallback createModificationOperationCallback(String resourcePath, long resourceId,
- int resourcePartition, Object resource, IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable)
- throws HyracksDataException {
+ public IModificationOperationCallback createModificationOperationCallback(LocalResource resource,
+ IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable) throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
IResourceLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
.getDatasetLifecycleManager();
- ILSMIndex index = (ILSMIndex) indexLifeCycleManager.get(resourcePath);
+ ILSMIndex index = (ILSMIndex) indexLifeCycleManager.get(resource.getPath());
if (index == null) {
- throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
+ throw new HyracksDataException("Index(id:" + resource.getId() + ") is not registered.");
}
try {
ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
+ Resource aResource = (Resource) resource.getResource();
IModificationOperationCallback modCallback = new TempDatasetIndexModificationOperationCallback(datasetId,
- primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resourceId,
- resourcePartition, resourceType, indexOp);
- txnCtx.registerIndexAndCallback(resourceId, index, (AbstractOperationCallback) modCallback, true);
+ primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resource.getId(),
+ aResource.partition(), resourceType, indexOp);
+ txnCtx.registerIndexAndCallback(resource.getId(), index, (AbstractOperationCallback) modCallback, true);
return modCallback;
} catch (ACIDException e) {
throw new HyracksDataException(e);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/163c3be3/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
index 39e916b..2740994 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
@@ -26,14 +26,16 @@ import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.common.transactions.Resource;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.common.file.LocalResource;
public class TempDatasetSecondaryIndexModificationOperationCallbackFactory extends AbstractOperationCallbackFactory
implements IModificationOperationCallbackFactory {
@@ -49,23 +51,23 @@ public class TempDatasetSecondaryIndexModificationOperationCallbackFactory exten
}
@Override
- public IModificationOperationCallback createModificationOperationCallback(String resourcePath, long resourceId,
- int resourcePartition, Object resource, IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable)
- throws HyracksDataException {
+ public IModificationOperationCallback createModificationOperationCallback(LocalResource resource,
+ IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable) throws HyracksDataException {
+ Resource aResource = (Resource) resource.getResource();
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
IResourceLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
.getDatasetLifecycleManager();
- ILSMIndex index = (ILSMIndex) indexLifeCycleManager.get(resourcePath);
+ ILSMIndex index = (ILSMIndex) indexLifeCycleManager.get(resource.getPath());
if (index == null) {
- throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
+ throw new HyracksDataException("Index(id:" + resource.getId() + ") is not registered.");
}
try {
ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
IModificationOperationCallback modCallback = new TempDatasetIndexModificationOperationCallback(datasetId,
- primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resourceId,
- resourcePartition, resourceType, indexOp);
- txnCtx.registerIndexAndCallback(resourceId, index, (AbstractOperationCallback) modCallback, false);
+ primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resource.getId(),
+ aResource.partition(), resourceType, indexOp);
+ txnCtx.registerIndexAndCallback(resource.getId(), index, (AbstractOperationCallback) modCallback, false);
return modCallback;
} catch (ACIDException e) {
throw new HyracksDataException(e);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/163c3be3/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
index 85cdc89..9349e93 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
@@ -25,14 +25,16 @@ import org.apache.asterix.common.transactions.AbstractOperationCallbackFactory;
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.asterix.common.transactions.JobId;
+import org.apache.asterix.common.transactions.Resource;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallback;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.IResourceLifecycleManager;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.common.file.LocalResource;
public class UpsertOperationCallbackFactory extends AbstractOperationCallbackFactory
implements IModificationOperationCallbackFactory {
@@ -50,24 +52,23 @@ public class UpsertOperationCallbackFactory extends AbstractOperationCallbackFac
}
@Override
- public IModificationOperationCallback createModificationOperationCallback(String resourceName, long resourceId,
- int resourcePartition, Object resource, IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable)
- throws HyracksDataException {
-
+ public IModificationOperationCallback createModificationOperationCallback(LocalResource resource,
+ IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable) throws HyracksDataException {
+ Resource aResource = (Resource) resource.getResource();
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
- IResourceLifecycleManager indexLifeCycleManager =
- txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
- ILSMIndex index = (ILSMIndex) indexLifeCycleManager.get(resourceName);
+ IResourceLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+ .getDatasetLifecycleManager();
+ ILSMIndex index = (ILSMIndex) indexLifeCycleManager.get(resource.getPath());
if (index == null) {
- throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
+ throw new HyracksDataException("Index(id:" + resource.getId() + ") is not registered.");
}
try {
ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
- IModificationOperationCallback modCallback =
- new UpsertOperationCallback(datasetId, primaryKeyFields, txnCtx, txnSubsystem.getLockManager(),
- txnSubsystem, resourceId, resourcePartition, resourceType, indexOp, logBeforeImage);
- txnCtx.registerIndexAndCallback(resourceId, index, (AbstractOperationCallback) modCallback, true);
+ IModificationOperationCallback modCallback = new UpsertOperationCallback(datasetId, primaryKeyFields,
+ txnCtx, txnSubsystem.getLockManager(),
+ txnSubsystem, resource.getId(), aResource.partition(), resourceType, indexOp, logBeforeImage);
+ txnCtx.registerIndexAndCallback(resource.getId(), index, (AbstractOperationCallback) modCallback, true);
return modCallback;
} catch (ACIDException e) {
throw new HyracksDataException(e);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/163c3be3/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/AbstractLSMLocalResourceMetadata.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/AbstractLSMLocalResourceMetadata.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/AbstractLSMLocalResourceMetadata.java
deleted file mode 100644
index 3367d2e..0000000
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/AbstractLSMLocalResourceMetadata.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.transaction.management.resource;
-
-import org.apache.asterix.common.api.ILocalResourceMetadata;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
-import org.apache.hyracks.api.dataflow.value.ITypeTraits;
-
-public abstract class AbstractLSMLocalResourceMetadata implements ILocalResourceMetadata {
-
- private static final long serialVersionUID = 1L;
-
- protected final int datasetID;
- protected final ITypeTraits[] filterTypeTraits;
- protected final IBinaryComparatorFactory[] filterCmpFactories;
- protected final int[] filterFields;
-
- public AbstractLSMLocalResourceMetadata(int datasetID, ITypeTraits[] filterTypeTraits,
- IBinaryComparatorFactory[] filterCmpFactories, int[] filterFields) {
- this.datasetID = datasetID;
- this.filterTypeTraits = filterTypeTraits;
- this.filterCmpFactories = filterCmpFactories;
- this.filterFields = filterFields;
- }
-
- public int getDatasetID() {
- return datasetID;
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/163c3be3/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java
index b76e829..3083cfe 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.transaction.management.resource;
-import java.io.File;
import java.util.Map;
import org.apache.asterix.common.context.BaseOperationTracker;
@@ -26,35 +25,39 @@ import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactor
import org.apache.asterix.common.transactions.IAsterixAppRuntimeContextProvider;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
+import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTree;
import org.apache.hyracks.storage.am.lsm.btree.util.LSMBTreeUtils;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+import org.apache.hyracks.storage.common.file.LocalResource;
public class ExternalBTreeLocalResourceMetadata extends LSMBTreeLocalResourceMetadata {
private static final long serialVersionUID = 1L;
public ExternalBTreeLocalResourceMetadata(ITypeTraits[] typeTraits, IBinaryComparatorFactory[] cmpFactories,
- int[] bloomFilterKeyFields, boolean isPrimary, int datasetID, ILSMMergePolicyFactory mergePolicyFactory,
+ int[] bloomFilterKeyFields, boolean isPrimary, int datasetID, int partition,
+ ILSMMergePolicyFactory mergePolicyFactory,
Map<String, String> mergePolicyProperties) {
- super(typeTraits, cmpFactories, bloomFilterKeyFields, isPrimary, datasetID, mergePolicyFactory,
+ super(typeTraits, cmpFactories, bloomFilterKeyFields, isPrimary, datasetID, partition, mergePolicyFactory,
mergePolicyProperties, null, null, null, null);
}
@Override
- public ILSMIndex createIndexInstance(IAsterixAppRuntimeContextProvider runtimeContextProvider, String filePath,
- int partition, int ioDeviceNum) {
- FileReference file = new FileReference(new File(filePath));
-
- LSMBTree lsmBTree = LSMBTreeUtils.createExternalBTree(file, runtimeContextProvider.getBufferCache(),
+ public ILSMIndex createIndexInstance(IAsterixAppRuntimeContextProvider runtimeContextProvider,
+ LocalResource resource) throws HyracksDataException {
+ IIOManager ioManager = runtimeContextProvider.getIOManager();
+ FileReference file = ioManager.getFileRef(resource.getPath(), true);
+ LSMBTree lsmBTree = LSMBTreeUtils.createExternalBTree(ioManager, file, runtimeContextProvider.getBufferCache(),
runtimeContextProvider.getFileMapManager(), typeTraits, cmpFactories, bloomFilterKeyFields,
runtimeContextProvider.getBloomFilterFalsePositiveRate(),
mergePolicyFactory.createMergePolicy(mergePolicyProperties,
runtimeContextProvider.getDatasetLifecycleManager()),
- new BaseOperationTracker(datasetID,
- runtimeContextProvider.getDatasetLifecycleManager().getDatasetInfo(datasetID)),
+ new BaseOperationTracker(datasetId(),
+ runtimeContextProvider.getDatasetLifecycleManager().getDatasetInfo(datasetId())),
runtimeContextProvider.getLSMIOScheduler(),
LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), -1, true);
return lsmBTree;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/163c3be3/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadataFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadataFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadataFactory.java
new file mode 100644
index 0000000..61d19c3
--- /dev/null
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadataFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.resource;
+
+import java.util.Map;
+
+import org.apache.asterix.common.transactions.Resource;
+import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
+import org.apache.hyracks.api.dataflow.value.ITypeTraits;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
+
+public class ExternalBTreeLocalResourceMetadataFactory extends LSMBTreeLocalResourceMetadataFactory {
+
+ private static final long serialVersionUID = 1L;
+
+ public ExternalBTreeLocalResourceMetadataFactory(ITypeTraits[] typeTraits, IBinaryComparatorFactory[] cmpFactories,
+ int[] bloomFilterKeyFields, boolean isPrimary, int datasetID,
+ ILSMMergePolicyFactory mergePolicyFactory,
+ Map<String, String> mergePolicyProperties) {
+ super(typeTraits, cmpFactories, bloomFilterKeyFields, isPrimary, datasetID, mergePolicyFactory,
+ mergePolicyProperties, null, null, null, null);
+ }
+
+ @Override
+ public Resource resource(int partition) {
+ return new ExternalBTreeLocalResourceMetadata(filterTypeTraits, filterCmpFactories, bloomFilterKeyFields,
+ isPrimary, datasetId, partition, mergePolicyFactory, mergePolicyProperties);
+ }
+}