You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by im...@apache.org on 2015/05/20 03:25:43 UTC
[02/10] incubator-asterixdb git commit: Temp dataset support: 1. DDLs
for creating a temporary dataset 2. Garbage collection for temporary dataset
3. Reading, inserting,
and deleting data from (to) a temporary dataset is locking-free (except
metadata loc
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ccd67fe8/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
index 59e255e..0b3bae6 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/api/IMetadataManager.java
@@ -616,7 +616,8 @@ public interface IMetadataManager {
* added
* @throws MetadataException
*/
- public void addExternalFile(MetadataTransactionContext mdTxnCtx, ExternalFile externalFile) throws MetadataException;
+ public void addExternalFile(MetadataTransactionContext mdTxnCtx, ExternalFile externalFile)
+ throws MetadataException;
/**
* @param mdTxnCtx
@@ -637,7 +638,8 @@ public interface IMetadataManager {
* dropped
* @throws MetadataException
*/
- public void dropExternalFile(MetadataTransactionContext mdTxnCtx, ExternalFile externalFile) throws MetadataException;
+ public void dropExternalFile(MetadataTransactionContext mdTxnCtx, ExternalFile externalFile)
+ throws MetadataException;
/**
* @param mdTxnCtx
@@ -648,9 +650,10 @@ public interface IMetadataManager {
* @throws MetadataException
*/
public void dropDatasetExternalFiles(MetadataTransactionContext mdTxnCtx, Dataset dataset) throws MetadataException;
-
+
/**
* Get en external file
+ *
* @param mdTxnCtx
* @param dataverseName
* @param datasetName
@@ -658,9 +661,9 @@ public interface IMetadataManager {
* @return
* @throws MetadataException
*/
- public ExternalFile getExternalFile(MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName, Integer fileNumber)
- throws MetadataException;
-
+ public ExternalFile getExternalFile(MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName,
+ Integer fileNumber) throws MetadataException;
+
/**
* update an existing dataset in metadata.
*
@@ -672,5 +675,12 @@ public interface IMetadataManager {
* For example, if the dataset already exists.
*/
public void updateDataset(MetadataTransactionContext ctx, Dataset dataset) throws MetadataException;
-
+
+ /**
+ * Clean up temporary datasets that have not been active for a long time.
+ *
+ * @throws MetadataException
+ */
+ public void cleanupTempDatasets() throws MetadataException;
+
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ccd67fe8/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
index af425d5..c93c29e 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -239,7 +239,7 @@ public class MetadataBootstrap {
primaryIndexes[i].getPartitioningExpr(), primaryIndexes[i].getPartitioningExpr(),
primaryIndexes[i].getPartitioningExprType(), primaryIndexes[i].getNodeGroupName(), false,
GlobalConfig.DEFAULT_COMPACTION_POLICY_NAME, GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES,
- null);
+ null, false);
MetadataManager.INSTANCE.addDataset(mdTxnCtx, new Dataset(primaryIndexes[i].getDataverseName(),
primaryIndexes[i].getIndexedDatasetName(), primaryIndexes[i].getPayloadRecordType().getTypeName(),
id, new HashMap<String, String>(), DatasetType.INTERNAL, primaryIndexes[i].getDatasetId().getId(),
@@ -390,7 +390,7 @@ public class MetadataBootstrap {
ILSMOperationTracker opTracker = index.isPrimaryIndex() ? runtimeContext.getLSMBTreeOperationTracker(index
.getDatasetId().getId()) : new BaseOperationTracker((DatasetLifecycleManager) indexLifecycleManager,
index.getDatasetId().getId(), ((DatasetLifecycleManager) indexLifecycleManager).getDatasetInfo(index
- .getDatasetId().getId()));
+ .getDatasetId().getId()));
final String path = file.getFile().getPath();
if (create) {
lsmBtree = LSMBTreeUtils.createLSMTree(
@@ -406,7 +406,7 @@ public class MetadataBootstrap {
GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, indexLifecycleManager), opTracker,
runtimeContext.getLSMIOScheduler(),
LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), index.isPrimaryIndex(),
- null, null, null, null);
+ null, null, null, null, true);
lsmBtree.create();
resourceID = runtimeContext.getResourceIdFactory().createId();
ILocalResourceMetadata localResourceMetadata = new LSMBTreeLocalResourceMetadata(typeTraits,
@@ -435,7 +435,7 @@ public class MetadataBootstrap {
runtimeContext.getMetadataMergePolicyFactory().createMergePolicy(
GlobalConfig.DEFAULT_COMPACTION_POLICY_PROPERTIES, indexLifecycleManager), opTracker,
runtimeContext.getLSMIOScheduler(), LSMBTreeIOOperationCallbackFactory.INSTANCE
- .createIOOperationCallback(), index.isPrimaryIndex(), null, null, null, null);
+ .createIOOperationCallback(), index.isPrimaryIndex(), null, null, null, null, true);
indexLifecycleManager.register(resourceID, lsmBtree);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ccd67fe8/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
index bc05e04..66a932e 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/declared/AqlMetadataProvider.java
@@ -97,6 +97,8 @@ import edu.uci.ics.asterix.runtime.external.ExternalRTreeSearchOperatorDescripto
import edu.uci.ics.asterix.runtime.formats.FormatUtils;
import edu.uci.ics.asterix.runtime.formats.NonTaggedDataFormat;
import edu.uci.ics.asterix.runtime.job.listener.JobEventListenerFactory;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.TempDatasetPrimaryIndexModificationOperationCallbackFactory;
+import edu.uci.ics.asterix.transaction.management.opcallbacks.TempDatasetSecondaryIndexModificationOperationCallbackFactory;
import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory;
import edu.uci.ics.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerProvider;
@@ -152,11 +154,13 @@ import edu.uci.ics.hyracks.dataflow.std.file.ITupleParserFactory;
import edu.uci.ics.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import edu.uci.ics.hyracks.storage.am.btree.frames.BTreeNSMInteriorFrameFactory;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.common.api.IPrimitiveValueProviderFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import edu.uci.ics.hyracks.storage.am.common.dataflow.TreeIndexBulkLoadOperatorDescriptor;
+import edu.uci.ics.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
import edu.uci.ics.hyracks.storage.am.common.tuples.TypeAwareTupleWriterFactory;
import edu.uci.ics.hyracks.storage.am.lsm.btree.dataflow.ExternalBTreeWithBuddyDataflowHelperFactory;
@@ -176,7 +180,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
private static Logger LOGGER = Logger.getLogger(AqlMetadataProvider.class.getName());
private MetadataTransactionContext mdTxnCtx;
private boolean isWriteTransaction;
- private Map<String, String[]> stores;
+ private final Map<String, String[]> stores;
private Map<String, String> config;
private IAWriterFactory writerFactory;
private FileSplit outputFile;
@@ -187,6 +191,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
private final Dataverse defaultDataverse;
private JobId jobId;
private Map<String, Integer> locks;
+ private boolean isTemporaryDatasetWriteJob = true;
private final AsterixStorageProperties storageProperties;
@@ -299,7 +304,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
@Override
public AqlDataSource findDataSource(AqlSourceId id) throws AlgebricksException {
- AqlSourceId aqlId = (AqlSourceId) id;
+ AqlSourceId aqlId = id;
try {
return lookupSourceInMetadata(aqlId);
} catch (MetadataException e) {
@@ -308,9 +313,15 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
}
public boolean isWriteTransaction() {
+ // The transaction writes persistent datasets.
return isWriteTransaction;
}
+ public boolean isTemporaryDatasetWriteJob() {
+ // The transaction only writes temporary datasets.
+ return isTemporaryDatasetWriteJob;
+ }
+
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getScannerRuntime(
IDataSource<AqlSourceId> dataSource, List<LogicalVariable> scanVariables,
@@ -557,15 +568,14 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
case INTERNAL:
feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, new FeedConnectionId(
feedDataSource.getDatasourceDataverse(), feedDataSource.getDatasourceName(), feedDataSource
- .getFeedConnectionId().getDatasetName()), adapterFactory,
- (ARecordType) adapterOutputType, feedDesc, feedPolicy.getProperties());
+ .getFeedConnectionId().getDatasetName()), adapterFactory, adapterOutputType,
+ feedDesc, feedPolicy.getProperties());
break;
case EXTERNAL:
String libraryName = feedDataSource.getFeed().getAdapterName().split("#")[0];
feedIngestor = new FeedIntakeOperatorDescriptor(jobSpec, feedDataSource.getFeedConnectionId(),
libraryName, adapterFactory.getClass().getName(), feedDataSource.getFeed()
- .getAdapterConfiguration(), (ARecordType) adapterOutputType, feedDesc,
- feedPolicy.getProperties());
+ .getAdapterConfiguration(), adapterOutputType, feedDesc, feedPolicy.getProperties());
break;
}
if (LOGGER.isLoggable(Level.INFO)) {
@@ -611,6 +621,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
boolean isSecondary = true;
int numSecondaryKeys = 0;
try {
+ boolean temp = dataset.getDatasetDetails().isTemp();
Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), dataset.getDatasetName());
if (primaryIndex != null && dataset.getDatasetType() != DatasetType.EXTERNAL) {
@@ -670,14 +681,15 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc;
try {
spPc = splitProviderAndPartitionConstraintsForDataset(dataset.getDataverseName(),
- dataset.getDatasetName(), indexName);
+ dataset.getDatasetName(), indexName, temp);
} catch (Exception e) {
throw new AlgebricksException(e);
}
ISearchOperationCallbackFactory searchCallbackFactory = null;
if (isSecondary) {
- searchCallbackFactory = new SecondaryIndexSearchOperationCallbackFactory();
+ searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
+ : new SecondaryIndexSearchOperationCallbackFactory();
} else {
JobId jobId = ((JobEventListenerFactory) jobSpec.getJobletEventListenerFactory()).getJobId();
int datasetId = dataset.getDatasetId();
@@ -689,11 +701,13 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
AqlMetadataImplConfig aqlMetadataImplConfig = (AqlMetadataImplConfig) implConfig;
ITransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
if (aqlMetadataImplConfig != null && aqlMetadataImplConfig.isInstantLock()) {
- searchCallbackFactory = new PrimaryIndexInstantSearchOperationCallbackFactory(jobId, datasetId,
- primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE);
+ searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
+ : new PrimaryIndexInstantSearchOperationCallbackFactory(jobId, datasetId, primaryKeyFields,
+ txnSubsystemProvider, ResourceType.LSM_BTREE);
} else {
- searchCallbackFactory = new PrimaryIndexSearchOperationCallbackFactory(jobId, datasetId,
- primaryKeyFields, txnSubsystemProvider, ResourceType.LSM_BTREE);
+ searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
+ : new PrimaryIndexSearchOperationCallbackFactory(jobId, datasetId, primaryKeyFields,
+ txnSubsystemProvider, ResourceType.LSM_BTREE);
}
}
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
@@ -711,7 +725,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
dataset.getDatasetId()), rtcProvider,
LSMBTreeIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate(), !isSecondary, filterTypeTraits,
- filterCmpFactories, btreeFields, filterFields), retainInput, retainNull,
+ filterCmpFactories, btreeFields, filterFields, !temp), retainInput, retainNull,
context.getNullWriterFactory(), searchCallbackFactory, minFilterFieldIndexes,
maxFilterFieldIndexes);
} else {
@@ -723,7 +737,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE, getStorageProperties()
.getBloomFilterFalsePositiveRate(), buddyBreeFields,
- ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this));
+ ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), !temp);
btreeSearchOp = new ExternalBTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, rtcProvider,
rtcProvider, spPc.first, typeTraits, comparatorFactories, bloomFilterKeyFields, lowKeyFields,
highKeyFields, lowKeyInclusive, highKeyInclusive, indexDataflowHelperFactory, retainInput,
@@ -746,6 +760,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
ARecordType recType = (ARecordType) findType(dataset.getDataverseName(), dataset.getItemTypeName());
int numPrimaryKeys = DatasetUtils.getPartitioningKeys(dataset).size();
+ boolean temp = dataset.getDatasetDetails().isTemp();
Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), indexName);
if (secondaryIndex == null) {
@@ -786,7 +801,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
numNestedSecondaryKeyFields + numPrimaryKeys, typeEnv, context);
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spPc = splitProviderAndPartitionConstraintsForDataset(
- dataset.getDataverseName(), dataset.getDatasetName(), indexName);
+ dataset.getDataverseName(), dataset.getDatasetName(), indexName, temp);
IBinaryComparatorFactory[] primaryComparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(
dataset, recType, context.getBinaryComparatorFactoryProvider());
@@ -812,7 +827,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
IAType nestedKeyType = NonTaggedFormatUtil.getNestedSpatialType(keyType.getTypeTag());
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
dataset, mdTxnCtx);
- ISearchOperationCallbackFactory searchCallbackFactory = new SecondaryIndexSearchOperationCallbackFactory();
+ ISearchOperationCallbackFactory searchCallbackFactory = temp ? NoOpOperationCallbackFactory.INSTANCE
+ : new SecondaryIndexSearchOperationCallbackFactory();
RTreeSearchOperatorDescriptor rtreeSearchOp;
if (dataset.getDatasetType() == DatasetType.INTERNAL) {
@@ -826,7 +842,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
LSMRTreeIOOperationCallbackFactory.INSTANCE, proposeLinearizer(
nestedKeyType.getTypeTag(), comparatorFactories.length),
storageProperties.getBloomFilterFalsePositiveRate(), rtreeFields, btreeFields,
- filterTypeTraits, filterCmpFactories, filterFields), retainInput, retainNull,
+ filterTypeTraits, filterCmpFactories, filterFields, !temp), retainInput, retainNull,
context.getNullWriterFactory(), searchCallbackFactory, minFilterFieldIndexes,
maxFilterFieldIndexes);
@@ -840,7 +856,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
proposeLinearizer(nestedKeyType.getTypeTag(), comparatorFactories.length),
getStorageProperties().getBloomFilterFalsePositiveRate(),
new int[] { numNestedSecondaryKeyFields },
- ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this));
+ ExternalDatasetsRegistry.INSTANCE.getAndLockDatasetVersion(dataset, this), !temp);
// Create the operator
rtreeSearchOp = new ExternalRTreeSearchOperatorDescriptor(jobSpec, outputRecDesc,
appContext.getStorageManagerInterface(), appContext.getIndexLifecycleManagerProvider(),
@@ -859,7 +875,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink,
int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc) {
FileSplitDataSink fsds = (FileSplitDataSink) sink;
- FileSplitSinkId fssi = (FileSplitSinkId) fsds.getId();
+ FileSplitSinkId fssi = fsds.getId();
FileSplit fs = fssi.getFileSplit();
File outFile = fs.getLocalFile().getFile();
String nodeId = fs.getNodeName();
@@ -875,7 +891,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, boolean ordered,
JobSpecification spec) throws AlgebricksException {
ResultSetDataSink rsds = (ResultSetDataSink) sink;
- ResultSetSinkId rssId = (ResultSetSinkId) rsds.getId();
+ ResultSetSinkId rssId = rsds.getId();
ResultSetId rsId = rssId.getResultSetId();
ResultWriterOperatorDescriptor resultWriter = null;
@@ -898,7 +914,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
Dataset dataset = ((DatasetDataSource) ads).getDataset();
try {
- String indexName = (String) indexId;
+ String indexName = indexId;
Index secondaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), indexName);
if (secondaryIndex != null) {
@@ -983,6 +999,9 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
}
try {
+ boolean temp = dataset.getDatasetDetails().isTemp();
+ isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
+
Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, dataset.getDataverseName(),
dataset.getDatasetName(), dataset.getDatasetName());
String indexName = primaryIndex.getIndexName();
@@ -995,7 +1014,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
itemType, context.getBinaryComparatorFactoryProvider());
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForDataset(
- dataSource.getId().getDataverseName(), datasetName, indexName);
+ dataSource.getId().getDataverseName(), datasetName, indexName, temp);
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
long numElementsHint = getCardinalityPerPartitionHint(dataset);
@@ -1022,7 +1041,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
LSMBTreeIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate(), true, filterTypeTraits,
- filterCmpFactories, btreeFields, filterFields));
+ filterCmpFactories, btreeFields, filterFields, !temp));
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(btreeBulkLoad,
splitsAndConstraint.second);
} catch (MetadataException me) {
@@ -1042,6 +1061,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
throw new AlgebricksException("Unknown dataset " + datasetName + " in dataverse "
+ dataSource.getId().getDataverseName());
}
+ boolean temp = dataset.getDatasetDetails().isTemp();
+ isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
int numKeys = keys.size();
int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
@@ -1076,7 +1097,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
IBinaryComparatorFactory[] comparatorFactories = DatasetUtils.computeKeysBinaryComparatorFactories(dataset,
itemType, context.getBinaryComparatorFactoryProvider());
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForDataset(
- dataSource.getId().getDataverseName(), datasetName, indexName);
+ dataSource.getId().getDataverseName(), datasetName, indexName, temp);
// prepare callback
JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
@@ -1093,8 +1114,10 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
int[] btreeFields = DatasetUtils.createBTreeFieldsWhenThereisAFilter(dataset);
TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
- PrimaryIndexModificationOperationCallbackFactory modificationCallbackFactory = new PrimaryIndexModificationOperationCallbackFactory(
- jobId, datasetId, primaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE);
+ IModificationOperationCallbackFactory modificationCallbackFactory = temp ? new TempDatasetPrimaryIndexModificationOperationCallbackFactory(
+ jobId, datasetId, primaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE)
+ : new PrimaryIndexModificationOperationCallbackFactory(jobId, datasetId, primaryKeyFields,
+ txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
dataset, mdTxnCtx);
@@ -1102,7 +1125,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
datasetId), compactionInfo.first, compactionInfo.second, new PrimaryIndexOperationTrackerProvider(
dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
LSMBTreeIOOperationCallbackFactory.INSTANCE, storageProperties.getBloomFilterFalsePositiveRate(),
- true, filterTypeTraits, filterCmpFactories, btreeFields, filterFields);
+ true, filterTypeTraits, filterCmpFactories, btreeFields, filterFields, !temp);
IOperatorDescriptor op;
if (bulkload) {
long numElementsHint = getCardinalityPerPartitionHint(dataset);
@@ -1390,7 +1413,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
secondaryKeyType.getTypeTag(), indexType, secondaryIndex.getGramLength());
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForDataset(
- dataverseName, datasetName, indexName);
+ dataverseName, datasetName, indexName, dataset.getDatasetDetails().isTemp());
// Generate Output Record format
ISerializerDeserializer[] tokenKeyPairFields = new ISerializerDeserializer[numTokenKeyPairFields];
@@ -1476,6 +1499,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
if (dataset == null) {
throw new AlgebricksException("Unknown dataset " + datasetName + " in dataverse " + dataverseName);
}
+ boolean temp = dataset.getDatasetDetails().isTemp();
+ isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
int numKeys = primaryKeys.size() + secondaryKeys.size();
int numFilterFields = DatasetUtils.getFilterField(dataset) == null ? 0 : 1;
@@ -1557,15 +1582,16 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForDataset(
- dataverseName, datasetName, indexName);
+ dataverseName, datasetName, indexName, temp);
// prepare callback
JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
int datasetId = dataset.getDatasetId();
TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
- SecondaryIndexModificationOperationCallbackFactory modificationCallbackFactory = new SecondaryIndexModificationOperationCallbackFactory(
+ IModificationOperationCallbackFactory modificationCallbackFactory = temp ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(
jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
- ResourceType.LSM_BTREE);
+ ResourceType.LSM_BTREE) : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+ modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_BTREE);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
dataset, mdTxnCtx);
@@ -1574,7 +1600,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
new SecondaryIndexOperationTrackerProvider(dataset.getDatasetId()),
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMBTreeIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits, filterCmpFactories,
- btreeFields, filterFields);
+ btreeFields, filterFields, !temp);
IOperatorDescriptor op;
if (bulkload) {
long numElementsHint = getCardinalityPerPartitionHint(dataset);
@@ -1592,7 +1618,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
dataset.getDatasetId()), AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER,
LSMBTreeIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate(), false, filterTypeTraits,
- filterCmpFactories, btreeFields, filterFields), filterFactory,
+ filterCmpFactories, btreeFields, filterFields, !temp), filterFactory,
modificationCallbackFactory, false, indexName);
}
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
@@ -1635,6 +1661,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
if (dataset == null) {
throw new AlgebricksException("Unknown dataset " + datasetName + " in dataverse " + dataverseName);
}
+ boolean temp = dataset.getDatasetDetails().isTemp();
+ isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
// For tokenization, sorting and loading.
// One token (+ optional partitioning field) + primary keys: [token,
@@ -1754,14 +1782,16 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForDataset(
- dataverseName, datasetName, indexName);
+ dataverseName, datasetName, indexName, temp);
// prepare callback
JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
int datasetId = dataset.getDatasetId();
TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
- SecondaryIndexModificationOperationCallbackFactory modificationCallbackFactory = new SecondaryIndexModificationOperationCallbackFactory(
+ IModificationOperationCallbackFactory modificationCallbackFactory = temp ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(
jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
+ ResourceType.LSM_INVERTED_INDEX) : new SecondaryIndexModificationOperationCallbackFactory(jobId,
+ datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
ResourceType.LSM_INVERTED_INDEX);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
@@ -1775,7 +1805,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate(), invertedIndexFields, filterTypeTraits,
filterCmpFactories, filterFields, filterFieldsForNonBulkLoadOps,
- invertedIndexFieldsForNonBulkLoadOps);
+ invertedIndexFieldsForNonBulkLoadOps, !temp);
} else {
indexDataFlowFactory = new PartitionedLSMInvertedIndexDataflowHelperFactory(
new AsterixVirtualBufferCacheProvider(dataset.getDatasetId()), compactionInfo.first,
@@ -1784,7 +1814,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
LSMInvertedIndexIOOperationCallbackFactory.INSTANCE,
storageProperties.getBloomFilterFalsePositiveRate(), invertedIndexFields, filterTypeTraits,
filterCmpFactories, filterFields, filterFieldsForNonBulkLoadOps,
- invertedIndexFieldsForNonBulkLoadOps);
+ invertedIndexFieldsForNonBulkLoadOps, !temp);
}
IOperatorDescriptor op;
if (bulkload) {
@@ -1816,6 +1846,10 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
boolean bulkload) throws AlgebricksException {
try {
Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
+
+ boolean temp = dataset.getDatasetDetails().isTemp();
+ isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob && temp;
+
String itemTypeName = dataset.getItemTypeName();
IAType itemType = MetadataManager.INSTANCE.getDatatype(mdTxnCtx, dataverseName, itemTypeName).getDatatype();
if (itemType.getTypeTag() != ATypeTag.RECORD) {
@@ -1878,7 +1912,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
dataset, recType, context.getBinaryComparatorFactoryProvider());
IAsterixApplicationContextInfo appContext = (IAsterixApplicationContextInfo) context.getAppContext();
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint = splitProviderAndPartitionConstraintsForDataset(
- dataverseName, datasetName, indexName);
+ dataverseName, datasetName, indexName, temp);
int[] btreeFields = new int[primaryComparatorFactories.length];
for (int k = 0; k < btreeFields.length; k++) {
btreeFields[k] = k + numSecondaryKeys;
@@ -1902,9 +1936,10 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
JobId jobId = ((JobEventListenerFactory) spec.getJobletEventListenerFactory()).getJobId();
int datasetId = dataset.getDatasetId();
TransactionSubsystemProvider txnSubsystemProvider = new TransactionSubsystemProvider();
- SecondaryIndexModificationOperationCallbackFactory modificationCallbackFactory = new SecondaryIndexModificationOperationCallbackFactory(
+ IModificationOperationCallbackFactory modificationCallbackFactory = temp ? new TempDatasetSecondaryIndexModificationOperationCallbackFactory(
jobId, datasetId, modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp,
- ResourceType.LSM_RTREE);
+ ResourceType.LSM_RTREE) : new SecondaryIndexModificationOperationCallbackFactory(jobId, datasetId,
+ modificationCallbackPrimaryKeyFields, txnSubsystemProvider, indexOp, ResourceType.LSM_RTREE);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo = DatasetUtils.getMergePolicyFactory(
dataset, mdTxnCtx);
@@ -1915,7 +1950,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
AsterixRuntimeComponentsProvider.RUNTIME_PROVIDER, LSMRTreeIOOperationCallbackFactory.INSTANCE,
proposeLinearizer(nestedKeyType.getTypeTag(), comparatorFactories.length),
storageProperties.getBloomFilterFalsePositiveRate(), rtreeFields, btreeFields, filterTypeTraits,
- filterCmpFactories, filterFields);
+ filterCmpFactories, filterFields, !temp);
IOperatorDescriptor op;
if (bulkload) {
long numElementsHint = getCardinalityPerPartitionHint(dataset);
@@ -1935,7 +1970,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
LSMRTreeIOOperationCallbackFactory.INSTANCE, proposeLinearizer(
nestedKeyType.getTypeTag(), comparatorFactories.length), storageProperties
.getBloomFilterFalsePositiveRate(), rtreeFields, btreeFields, filterTypeTraits,
- filterCmpFactories, filterFields), filterFactory,
+ filterCmpFactories, filterFields, !temp), filterFactory,
modificationCallbackFactory, false, indexName);
}
return new Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>(op, splitsAndConstraint.second);
@@ -1992,8 +2027,8 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
}
public Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitProviderAndPartitionConstraintsForDataset(
- String dataverseName, String datasetName, String targetIdxName) throws AlgebricksException {
- FileSplit[] splits = splitsForDataset(mdTxnCtx, dataverseName, datasetName, targetIdxName);
+ String dataverseName, String datasetName, String targetIdxName, boolean temp) throws AlgebricksException {
+ FileSplit[] splits = splitsForDataset(mdTxnCtx, dataverseName, datasetName, targetIdxName, temp);
return splitProviderAndPartitionConstraints(splits);
}
@@ -2038,7 +2073,7 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
}
private FileSplit[] splitsForDataset(MetadataTransactionContext mdTxnCtx, String dataverseName, String datasetName,
- String targetIdxName) throws AlgebricksException {
+ String targetIdxName, boolean temp) throws AlgebricksException {
try {
File relPathFile = new File(getRelativePath(dataverseName, datasetName + "_idx_" + targetIdxName));
Dataset dataset = MetadataManager.INSTANCE.getDataset(mdTxnCtx, dataverseName, datasetName);
@@ -2065,19 +2100,14 @@ public class AqlMetadataProvider implements IMetadataProvider<AqlSourceId, Strin
String[] ioDevices = AsterixClusterProperties.INSTANCE.getIODevices(nd);
for (int j = 0; j < nodeStores.length; j++) {
for (int k = 0; k < numIODevices; k++) {
- File f = new File(ioDevices[k] + File.separator + nodeStores[j] + File.separator
- + relPathFile);
+ File f = new File(ioDevices[k] + File.separator + nodeStores[j]
+ + (temp ? (File.separator + "temp") : "") + File.separator + relPathFile);
splitArray.add(new FileSplit(nd, new FileReference(f), k));
}
}
}
}
- FileSplit[] splits = new FileSplit[splitArray.size()];
- int i = 0;
- for (FileSplit fs : splitArray) {
- splits[i++] = fs;
- }
- return splits;
+ return splitArray.toArray(new FileSplit[0]);
} catch (MetadataException me) {
throw new AlgebricksException(me);
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ccd67fe8/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/ExternalDatasetDetails.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/ExternalDatasetDetails.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/ExternalDatasetDetails.java
index eb83159..5f7fb74 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/ExternalDatasetDetails.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/ExternalDatasetDetails.java
@@ -46,6 +46,7 @@ public class ExternalDatasetDetails implements IDatasetDetails {
private final String adapter;
private final Map<String, String> properties;
private final String nodeGroupName;
+ private final long addToCacheTime;
private Date lastRefreshTime;
private ExternalDatasetTransactionState state;
protected String compactionPolicy;
@@ -57,6 +58,7 @@ public class ExternalDatasetDetails implements IDatasetDetails {
this.properties = properties;
this.adapter = adapter;
this.nodeGroupName = nodeGroupName;
+ this.addToCacheTime = System.currentTimeMillis();
this.lastRefreshTime = lastRefreshTime;
this.state = state;
this.compactionPolicy = compactionPolicy;
@@ -196,6 +198,16 @@ public class ExternalDatasetDetails implements IDatasetDetails {
return nodeGroupName;
}
+ @Override
+ public boolean isTemp() {
+ return false;
+ }
+
+ @Override
+ public long getLastAccessTime() {
+ return addToCacheTime;
+ }
+
public Date getTimestamp() {
return lastRefreshTime;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ccd67fe8/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/InternalDatasetDetails.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/InternalDatasetDetails.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/InternalDatasetDetails.java
index 507a9e3..4ce8593 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/InternalDatasetDetails.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entities/InternalDatasetDetails.java
@@ -60,13 +60,15 @@ public class InternalDatasetDetails implements IDatasetDetails {
protected final boolean autogenerated;
protected final String compactionPolicy;
protected final Map<String, String> compactionPolicyProperties;
+ protected final boolean temp;
+ protected long lastAccessTime;
protected final List<String> filterField;
public static final String FILTER_FIELD_NAME = "FilterField";
public InternalDatasetDetails(FileStructure fileStructure, PartitioningStrategy partitioningStrategy,
List<List<String>> partitioningKey, List<List<String>> primaryKey, List<IAType> primaryKeyType,
String groupName, boolean autogenerated, String compactionPolicy,
- Map<String, String> compactionPolicyProperties, List<String> filterField) {
+ Map<String, String> compactionPolicyProperties, List<String> filterField, boolean temp) {
this.fileStructure = fileStructure;
this.partitioningStrategy = partitioningStrategy;
this.partitioningKeys = partitioningKey;
@@ -77,6 +79,8 @@ public class InternalDatasetDetails implements IDatasetDetails {
this.compactionPolicy = compactionPolicy;
this.compactionPolicyProperties = compactionPolicyProperties;
this.filterField = filterField;
+ this.temp = temp;
+ this.lastAccessTime = System.currentTimeMillis();
}
@Override
@@ -124,10 +128,21 @@ public class InternalDatasetDetails implements IDatasetDetails {
@Override
public DatasetType getDatasetType() {
+ lastAccessTime = System.currentTimeMillis();
return DatasetType.INTERNAL;
}
@Override
+ public long getLastAccessTime() {
+ return lastAccessTime;
+ }
+
+ @Override
+ public boolean isTemp() {
+ return temp;
+ }
+
+ @Override
public void writeDatasetDetailsRecordType(DataOutput out) throws HyracksDataException {
IARecordBuilder internalRecordBuilder = new RecordBuilder();
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ccd67fe8/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
index 2f7185a..ed6fca4 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
@@ -75,9 +75,9 @@ public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> {
public static final int DATASET_PAYLOAD_TUPLE_FIELD_INDEX = 2;
@SuppressWarnings("unchecked")
- private ISerializerDeserializer<ARecord> recordSerDes = AqlSerializerDeserializerProvider.INSTANCE
+ private final ISerializerDeserializer<ARecord> recordSerDes = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(MetadataRecordTypes.DATASET_RECORDTYPE);
- private AMutableInt32 aInt32;
+ private final AMutableInt32 aInt32;
protected ISerializerDeserializer<AInt32> aInt32Serde;
@SuppressWarnings("unchecked")
@@ -94,7 +94,7 @@ public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> {
int recordLength = frameTuple.getFieldLength(DATASET_PAYLOAD_TUPLE_FIELD_INDEX);
ByteArrayInputStream stream = new ByteArrayInputStream(serRecord, recordStartOffset, recordLength);
DataInput in = new DataInputStream(stream);
- ARecord datasetRecord = (ARecord) recordSerDes.deserialize(in);
+ ARecord datasetRecord = recordSerDes.deserialize(in);
return createDatasetFromARecord(datasetRecord);
}
@@ -177,10 +177,11 @@ public class DatasetTupleTranslator extends AbstractTupleTranslator<Dataset> {
}
}
+ // Temporary dataset only lives in the compiler therefore the temp field is false.
+ // DatasetTupleTranslator always read from the metadata node, so the temp flag should be always false.
datasetDetails = new InternalDatasetDetails(fileStructure, partitioningStrategy, partitioningKey,
partitioningKey, partitioningKeyType, groupName, autogenerated, compactionPolicy,
- compactionPolicyProperties, filterField);
-
+ compactionPolicyProperties, filterField, false);
break;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ccd67fe8/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/external/ExternalLoopkupOperatorDiscriptor.java
----------------------------------------------------------------------
diff --git a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/external/ExternalLoopkupOperatorDiscriptor.java b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/external/ExternalLoopkupOperatorDiscriptor.java
index 99c747b..e7df93f 100644
--- a/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/external/ExternalLoopkupOperatorDiscriptor.java
+++ b/asterix-metadata/src/main/java/edu/uci/ics/asterix/metadata/external/ExternalLoopkupOperatorDiscriptor.java
@@ -16,7 +16,6 @@ package edu.uci.ics.asterix.metadata.external;
import java.nio.ByteBuffer;
-import edu.uci.ics.asterix.transaction.management.opcallbacks.SecondaryIndexSearchOperationCallbackFactory;
import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
import edu.uci.ics.hyracks.api.dataflow.IOperatorNodePushable;
import edu.uci.ics.hyracks.api.dataflow.value.INullWriterFactory;
@@ -39,8 +38,8 @@ import edu.uci.ics.hyracks.storage.common.IStorageManagerInterface;
public class ExternalLoopkupOperatorDiscriptor extends AbstractTreeIndexOperatorDescriptor {
private static final long serialVersionUID = 1L;
- private IControlledAdapterFactory adapterFactory;
- private INullWriterFactory iNullWriterFactory;
+ private final IControlledAdapterFactory adapterFactory;
+ private final INullWriterFactory iNullWriterFactory;
public ExternalLoopkupOperatorDiscriptor(IOperatorDescriptorRegistry spec,
IControlledAdapterFactory adapterFactory, RecordDescriptor outRecDesc,
@@ -53,7 +52,7 @@ public class ExternalLoopkupOperatorDiscriptor extends AbstractTreeIndexOperator
FilesIndexDescription.EXTERNAL_FILE_INDEX_TYPE_TRAITS,
FilesIndexDescription.FILES_INDEX_COMP_FACTORIES, FilesIndexDescription.BLOOM_FILTER_FIELDS,
externalFilesIndexDataFlowHelperFactory, null, propagateInput, retainNull, iNullWriterFactory, null,
- new SecondaryIndexSearchOperationCallbackFactory(), null);
+ searchOpCallbackFactory, null);
this.adapterFactory = adapterFactory;
this.iNullWriterFactory = iNullWriterFactory;
}
@@ -69,7 +68,7 @@ public class ExternalLoopkupOperatorDiscriptor extends AbstractTreeIndexOperator
this);
return new AbstractUnaryInputUnaryOutputOperatorNodePushable() {
// The adapter that uses the file index along with the coming tuples to access files in HDFS
- private IControlledAdapter adapter = adapterFactory.createAdapter(ctx, fileIndexAccessor,
+ private final IControlledAdapter adapter = adapterFactory.createAdapter(ctx, fileIndexAccessor,
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0));
@Override
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ccd67fe8/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/TempDatasetIndexModificationOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/TempDatasetIndexModificationOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/TempDatasetIndexModificationOperationCallback.java
new file mode 100644
index 0000000..e539bb8
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/TempDatasetIndexModificationOperationCallback.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.transaction.management.opcallbacks;
+
+import edu.uci.ics.asterix.common.transactions.ILockManager;
+import edu.uci.ics.asterix.common.transactions.ITransactionContext;
+import edu.uci.ics.asterix.common.transactions.ITransactionSubsystem;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+
+/**
+ * This class is the operation callback for temporary datasets.
+ * A temporary dataset does not require any lock and does not generate any write-ahead update and commit log
+ * but generates flush log and job commit log.
+ * The "before" and "found" method in this callback is empty so that no locking is requested for accessing a temporary
+ * dataset and no write-ahead log is written for update operations.
+ */
+public class TempDatasetIndexModificationOperationCallback extends AbstractIndexModificationOperationCallback implements
+ IModificationOperationCallback {
+
+ public TempDatasetIndexModificationOperationCallback(int datasetId, int[] primaryKeyFields,
+ ITransactionContext txnCtx, ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId,
+ byte resourceType, IndexOperation indexOp) {
+ super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, resourceId, resourceType, indexOp);
+ }
+
+ @Override
+ public void before(ITupleReference tuple) throws HyracksDataException {
+
+ }
+
+ @Override
+ public void found(ITupleReference before, ITupleReference after) throws HyracksDataException {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ccd67fe8/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
new file mode 100644
index 0000000..535672e
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.transaction.management.opcallbacks;
+
+import edu.uci.ics.asterix.common.context.ITransactionSubsystemProvider;
+import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.transactions.AbstractOperationCallback;
+import edu.uci.ics.asterix.common.transactions.AbstractOperationCallbackFactory;
+import edu.uci.ics.asterix.common.transactions.ITransactionContext;
+import edu.uci.ics.asterix.common.transactions.ITransactionSubsystem;
+import edu.uci.ics.asterix.common.transactions.JobId;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+
+public class TempDatasetPrimaryIndexModificationOperationCallbackFactory extends AbstractOperationCallbackFactory implements
+ IModificationOperationCallbackFactory {
+
+ private static final long serialVersionUID = 1L;
+ private final IndexOperation indexOp;
+
+ public TempDatasetPrimaryIndexModificationOperationCallbackFactory(JobId jobId, int datasetId, int[] primaryKeyFields,
+ ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation indexOp, byte resourceType) {
+ super(jobId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
+ this.indexOp = indexOp;
+ }
+
+ @Override
+ public IModificationOperationCallback createModificationOperationCallback(long resourceId, Object resource,
+ IHyracksTaskContext ctx) throws HyracksDataException {
+ ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
+ IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+ .getIndexLifecycleManager();
+ ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceId);
+ if (index == null) {
+ throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
+ }
+
+ try {
+ ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
+ IModificationOperationCallback modCallback = new TempDatasetIndexModificationOperationCallback(datasetId,
+ primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resourceId, resourceType,
+ indexOp);
+ txnCtx.registerIndexAndCallback(resourceId, index, (AbstractOperationCallback) modCallback, true);
+ return modCallback;
+ } catch (ACIDException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ccd67fe8/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
new file mode 100644
index 0000000..c47bd38
--- /dev/null
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2009-2013 by The Regents of the University of California
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * you may obtain a copy of the License from
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package edu.uci.ics.asterix.transaction.management.opcallbacks;
+
+import edu.uci.ics.asterix.common.context.ITransactionSubsystemProvider;
+import edu.uci.ics.asterix.common.exceptions.ACIDException;
+import edu.uci.ics.asterix.common.transactions.AbstractOperationCallback;
+import edu.uci.ics.asterix.common.transactions.AbstractOperationCallbackFactory;
+import edu.uci.ics.asterix.common.transactions.ITransactionContext;
+import edu.uci.ics.asterix.common.transactions.ITransactionSubsystem;
+import edu.uci.ics.asterix.common.transactions.JobId;
+import edu.uci.ics.hyracks.api.context.IHyracksTaskContext;
+import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
+import edu.uci.ics.hyracks.storage.am.common.api.IIndexLifecycleManager;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallback;
+import edu.uci.ics.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOperation;
+import edu.uci.ics.hyracks.storage.am.lsm.common.api.ILSMIndex;
+
+public class TempDatasetSecondaryIndexModificationOperationCallbackFactory extends AbstractOperationCallbackFactory implements
+ IModificationOperationCallbackFactory {
+
+ private static final long serialVersionUID = 1L;
+ private final IndexOperation indexOp;
+
+ public TempDatasetSecondaryIndexModificationOperationCallbackFactory(JobId jobId, int datasetId, int[] primaryKeyFields,
+ ITransactionSubsystemProvider txnSubsystemProvider, IndexOperation indexOp, byte resourceType) {
+ super(jobId, datasetId, primaryKeyFields, txnSubsystemProvider, resourceType);
+ this.indexOp = indexOp;
+ }
+
+ @Override
+ public IModificationOperationCallback createModificationOperationCallback(long resourceId, Object resource,
+ IHyracksTaskContext ctx) throws HyracksDataException {
+ ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
+ IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
+ .getIndexLifecycleManager();
+ ILSMIndex index = (ILSMIndex) indexLifeCycleManager.getIndex(resourceId);
+ if (index == null) {
+ throw new HyracksDataException("Index(id:" + resourceId + ") is not registered.");
+ }
+
+ try {
+ ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
+ IModificationOperationCallback modCallback = new TempDatasetIndexModificationOperationCallback(datasetId,
+ primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resourceId, resourceType,
+ indexOp);
+ txnCtx.registerIndexAndCallback(resourceId, index, (AbstractOperationCallback) modCallback, false);
+ return modCallback;
+ } catch (ACIDException e) {
+ throw new HyracksDataException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ccd67fe8/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java
index 7d699b3..35140e6 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/ExternalBTreeLocalResourceMetadata.java
@@ -44,13 +44,20 @@ public class ExternalBTreeLocalResourceMetadata extends LSMBTreeLocalResourceMet
public ILSMIndex createIndexInstance(IAsterixAppRuntimeContextProvider runtimeContextProvider, String filePath,
int partition) {
FileReference file = new FileReference(new File(filePath));
- LSMBTree lsmBTree = LSMBTreeUtils.createExternalBTree(file, runtimeContextProvider.getBufferCache(),
- runtimeContextProvider.getFileMapManager(), typeTraits, cmpFactories, bloomFilterKeyFields,
- runtimeContextProvider.getBloomFilterFalsePositiveRate(), mergePolicyFactory.createMergePolicy(
- mergePolicyProperties, runtimeContextProvider.getIndexLifecycleManager()),
+ LSMBTree lsmBTree = LSMBTreeUtils.createExternalBTree(
+ file,
+ runtimeContextProvider.getBufferCache(),
+ runtimeContextProvider.getFileMapManager(),
+ typeTraits,
+ cmpFactories,
+ bloomFilterKeyFields,
+ runtimeContextProvider.getBloomFilterFalsePositiveRate(),
+ mergePolicyFactory.createMergePolicy(mergePolicyProperties,
+ runtimeContextProvider.getIndexLifecycleManager()),
new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(),
- datasetID, ((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager()).getDatasetInfo(datasetID)), runtimeContextProvider.getLSMIOScheduler(),
- LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), -1);
+ datasetID, ((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager())
+ .getDatasetInfo(datasetID)), runtimeContextProvider.getLSMIOScheduler(),
+ LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), -1, true);
return lsmBTree;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ccd67fe8/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/ExternalBTreeWithBuddyLocalResourceMetadata.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/ExternalBTreeWithBuddyLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/ExternalBTreeWithBuddyLocalResourceMetadata.java
index ad4ff0c..c9e7f33 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/ExternalBTreeWithBuddyLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/ExternalBTreeWithBuddyLocalResourceMetadata.java
@@ -57,12 +57,19 @@ public class ExternalBTreeWithBuddyLocalResourceMetadata extends AbstractLSMLoca
public ILSMIndex createIndexInstance(IAsterixAppRuntimeContextProvider runtimeContextProvider, String filePath,
int partition) throws HyracksDataException {
FileReference file = new FileReference(new File(filePath));
- return LSMBTreeUtils.createExternalBTreeWithBuddy(file, runtimeContextProvider.getBufferCache(),
- runtimeContextProvider.getFileMapManager(), typeTraits, btreeCmpFactories, runtimeContextProvider
- .getBloomFilterFalsePositiveRate(), mergePolicyFactory.createMergePolicy(mergePolicyProperties,
- runtimeContextProvider.getIndexLifecycleManager()), new BaseOperationTracker(
- (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(), datasetID, ((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager()).getDatasetInfo(datasetID)),
- runtimeContextProvider.getLSMIOScheduler(), LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE
- .createIOOperationCallback(), buddyBtreeFields, -1);
+ return LSMBTreeUtils.createExternalBTreeWithBuddy(
+ file,
+ runtimeContextProvider.getBufferCache(),
+ runtimeContextProvider.getFileMapManager(),
+ typeTraits,
+ btreeCmpFactories,
+ runtimeContextProvider.getBloomFilterFalsePositiveRate(),
+ mergePolicyFactory.createMergePolicy(mergePolicyProperties,
+ runtimeContextProvider.getIndexLifecycleManager()),
+ new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(),
+ datasetID, ((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager())
+ .getDatasetInfo(datasetID)), runtimeContextProvider.getLSMIOScheduler(),
+ LSMBTreeWithBuddyIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), buddyBtreeFields, -1,
+ true);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ccd67fe8/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/ExternalRTreeLocalResourceMetadata.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/ExternalRTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/ExternalRTreeLocalResourceMetadata.java
index 7c4a437..5c67274 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/ExternalRTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/ExternalRTreeLocalResourceMetadata.java
@@ -54,14 +54,23 @@ public class ExternalRTreeLocalResourceMetadata extends LSMRTreeLocalResourceMet
int partition) throws HyracksDataException {
FileReference file = new FileReference(new File(filePath));
try {
- return LSMRTreeUtils.createExternalRTree(file, runtimeContextProvider.getBufferCache(),
- runtimeContextProvider.getFileMapManager(), typeTraits, rtreeCmpFactories, btreeCmpFactories,
- valueProviderFactories, rtreePolicyType, runtimeContextProvider.getBloomFilterFalsePositiveRate(),
+ return LSMRTreeUtils.createExternalRTree(
+ file,
+ runtimeContextProvider.getBufferCache(),
+ runtimeContextProvider.getFileMapManager(),
+ typeTraits,
+ rtreeCmpFactories,
+ btreeCmpFactories,
+ valueProviderFactories,
+ rtreePolicyType,
+ runtimeContextProvider.getBloomFilterFalsePositiveRate(),
mergePolicyFactory.createMergePolicy(mergePolicyProperties,
- runtimeContextProvider.getIndexLifecycleManager()), new BaseOperationTracker(
- (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(), datasetID, ((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager()).getDatasetInfo(datasetID)),
- runtimeContextProvider.getLSMIOScheduler(), LSMRTreeIOOperationCallbackFactory.INSTANCE
- .createIOOperationCallback(), linearizeCmpFactory, btreeFields, -1);
+ runtimeContextProvider.getIndexLifecycleManager()),
+ new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider
+ .getIndexLifecycleManager(), datasetID, ((DatasetLifecycleManager) runtimeContextProvider
+ .getIndexLifecycleManager()).getDatasetInfo(datasetID)), runtimeContextProvider
+ .getLSMIOScheduler(), LSMRTreeIOOperationCallbackFactory.INSTANCE
+ .createIOOperationCallback(), linearizeCmpFactory, btreeFields, -1, true);
} catch (TreeIndexException e) {
throw new HyracksDataException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ccd67fe8/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
index 76dd03a..08938be 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMBTreeLocalResourceMetadata.java
@@ -62,15 +62,23 @@ public class LSMBTreeLocalResourceMetadata extends AbstractLSMLocalResourceMetad
int partition) {
FileReference file = new FileReference(new File(filePath));
List<IVirtualBufferCache> virtualBufferCaches = runtimeContextProvider.getVirtualBufferCaches(datasetID);
- LSMBTree lsmBTree = LSMBTreeUtils.createLSMTree(virtualBufferCaches, file, runtimeContextProvider
- .getBufferCache(), runtimeContextProvider.getFileMapManager(), typeTraits, cmpFactories,
- bloomFilterKeyFields, runtimeContextProvider.getBloomFilterFalsePositiveRate(), mergePolicyFactory
- .createMergePolicy(mergePolicyProperties, runtimeContextProvider.getIndexLifecycleManager()),
+ LSMBTree lsmBTree = LSMBTreeUtils.createLSMTree(
+ virtualBufferCaches,
+ file,
+ runtimeContextProvider.getBufferCache(),
+ runtimeContextProvider.getFileMapManager(),
+ typeTraits,
+ cmpFactories,
+ bloomFilterKeyFields,
+ runtimeContextProvider.getBloomFilterFalsePositiveRate(),
+ mergePolicyFactory.createMergePolicy(mergePolicyProperties,
+ runtimeContextProvider.getIndexLifecycleManager()),
isPrimary ? runtimeContextProvider.getLSMBTreeOperationTracker(datasetID) : new BaseOperationTracker(
- (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(), datasetID, ((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager()).getDatasetInfo(datasetID)),
- runtimeContextProvider.getLSMIOScheduler(), LSMBTreeIOOperationCallbackFactory.INSTANCE
- .createIOOperationCallback(), isPrimary, filterTypeTraits, filterCmpFactories, btreeFields,
- filterFields);
+ (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(), datasetID,
+ ((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager())
+ .getDatasetInfo(datasetID)), runtimeContextProvider.getLSMIOScheduler(),
+ LSMBTreeIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(), isPrimary, filterTypeTraits,
+ filterCmpFactories, btreeFields, filterFields, true);
return lsmBTree;
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ccd67fe8/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
index 7d43626..63cf858 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMInvertedIndexLocalResourceMetadata.java
@@ -88,10 +88,12 @@ public class LSMInvertedIndexLocalResourceMetadata extends AbstractLSMLocalResou
mergePolicyFactory.createMergePolicy(mergePolicyProperties,
runtimeContextProvider.getIndexLifecycleManager()),
new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider
- .getIndexLifecycleManager(), datasetID, ((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager()).getDatasetInfo(datasetID)), runtimeContextProvider.getLSMIOScheduler(),
+ .getIndexLifecycleManager(), datasetID,
+ ((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager())
+ .getDatasetInfo(datasetID)), runtimeContextProvider.getLSMIOScheduler(),
LSMInvertedIndexIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(),
invertedIndexFields, filterTypeTraits, filterCmpFactories, filterFields,
- filterFieldsForNonBulkLoadOps, invertedIndexFieldsForNonBulkLoadOps);
+ filterFieldsForNonBulkLoadOps, invertedIndexFieldsForNonBulkLoadOps, true);
} else {
return InvertedIndexUtils.createLSMInvertedIndex(
virtualBufferCaches,
@@ -107,10 +109,12 @@ public class LSMInvertedIndexLocalResourceMetadata extends AbstractLSMLocalResou
mergePolicyFactory.createMergePolicy(mergePolicyProperties,
runtimeContextProvider.getIndexLifecycleManager()),
new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider
- .getIndexLifecycleManager(), datasetID, ((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager()).getDatasetInfo(datasetID)), runtimeContextProvider.getLSMIOScheduler(),
+ .getIndexLifecycleManager(), datasetID,
+ ((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager())
+ .getDatasetInfo(datasetID)), runtimeContextProvider.getLSMIOScheduler(),
LSMInvertedIndexIOOperationCallbackFactory.INSTANCE.createIOOperationCallback(),
invertedIndexFields, filterTypeTraits, filterCmpFactories, filterFields,
- filterFieldsForNonBulkLoadOps, invertedIndexFieldsForNonBulkLoadOps);
+ filterFieldsForNonBulkLoadOps, invertedIndexFieldsForNonBulkLoadOps, true);
}
} catch (IndexException e) {
throw new HyracksDataException(e);
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ccd67fe8/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
index 6b0597d..8d0255d 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/resource/LSMRTreeLocalResourceMetadata.java
@@ -75,15 +75,25 @@ public class LSMRTreeLocalResourceMetadata extends AbstractLSMLocalResourceMetad
FileReference file = new FileReference(new File(filePath));
List<IVirtualBufferCache> virtualBufferCaches = runtimeContextProvider.getVirtualBufferCaches(datasetID);
try {
- return LSMRTreeUtils.createLSMTree(virtualBufferCaches, file, runtimeContextProvider.getBufferCache(),
- runtimeContextProvider.getFileMapManager(), typeTraits, rtreeCmpFactories, btreeCmpFactories,
- valueProviderFactories, rtreePolicyType, runtimeContextProvider.getBloomFilterFalsePositiveRate(),
+ return LSMRTreeUtils.createLSMTree(
+ virtualBufferCaches,
+ file,
+ runtimeContextProvider.getBufferCache(),
+ runtimeContextProvider.getFileMapManager(),
+ typeTraits,
+ rtreeCmpFactories,
+ btreeCmpFactories,
+ valueProviderFactories,
+ rtreePolicyType,
+ runtimeContextProvider.getBloomFilterFalsePositiveRate(),
mergePolicyFactory.createMergePolicy(mergePolicyProperties,
- runtimeContextProvider.getIndexLifecycleManager()), new BaseOperationTracker(
- (DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager(), datasetID, ((DatasetLifecycleManager) runtimeContextProvider.getIndexLifecycleManager()).getDatasetInfo(datasetID)),
- runtimeContextProvider.getLSMIOScheduler(), LSMRTreeIOOperationCallbackFactory.INSTANCE
+ runtimeContextProvider.getIndexLifecycleManager()),
+ new BaseOperationTracker((DatasetLifecycleManager) runtimeContextProvider
+ .getIndexLifecycleManager(), datasetID, ((DatasetLifecycleManager) runtimeContextProvider
+ .getIndexLifecycleManager()).getDatasetInfo(datasetID)), runtimeContextProvider
+ .getLSMIOScheduler(), LSMRTreeIOOperationCallbackFactory.INSTANCE
.createIOOperationCallback(), linearizeCmpFactory, rtreeFields, btreeFields,
- filterTypeTraits, filterCmpFactories, filterFields);
+ filterTypeTraits, filterCmpFactories, filterFields, true);
} catch (TreeIndexException e) {
throw new HyracksDataException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/ccd67fe8/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index 3720f08..8f68926 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -61,29 +61,29 @@ public class LockManager implements ILockManager, ILifeCycleComponent {
private static final int ESCALATED = 1;
private static final int DONOT_ESCALATE = 2;
- private TransactionSubsystem txnSubsystem;
+ private final TransactionSubsystem txnSubsystem;
//all threads accessing to LockManager's tables such as jobHT and datasetResourceHT
//are serialized through LockTableLatch. All threads waiting the latch will be fairly served
//in FIFO manner when the latch is available.
private final ReadWriteLock lockTableLatch;
private final ReadWriteLock waiterLatch;
- private HashMap<JobId, JobInfo> jobHT;
- private HashMap<DatasetId, DatasetLockInfo> datasetResourceHT;
+ private final HashMap<JobId, JobInfo> jobHT;
+ private final HashMap<DatasetId, DatasetLockInfo> datasetResourceHT;
- private EntityLockInfoManager entityLockInfoManager;
- private EntityInfoManager entityInfoManager;
- private LockWaiterManager lockWaiterManager;
+ private final EntityLockInfoManager entityLockInfoManager;
+ private final EntityInfoManager entityInfoManager;
+ private final LockWaiterManager lockWaiterManager;
- private DeadlockDetector deadlockDetector;
- private TimeOutDetector toutDetector;
- private DatasetId tempDatasetIdObj; //temporary object to avoid object creation
- private JobId tempJobIdObj;
+ private final DeadlockDetector deadlockDetector;
+ private final TimeOutDetector toutDetector;
+ private final DatasetId tempDatasetIdObj; //temporary object to avoid object creation
+ private final JobId tempJobIdObj;
private int tryLockDatasetGranuleRevertOperation;
private LockRequestTracker lockRequestTracker; //for debugging
- private ConsecutiveWakeupContext consecutiveWakeupContext;
+ private final ConsecutiveWakeupContext consecutiveWakeupContext;
public LockManager(TransactionSubsystem txnSubsystem) throws ACIDException {
this.txnSubsystem = txnSubsystem;
@@ -638,7 +638,8 @@ public class LockManager implements ILockManager, ILifeCycleComponent {
}
@Override
- public void unlock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext) throws ACIDException {
+ public void unlock(DatasetId datasetId, int entityHashValue, byte lockMode, ITransactionContext txnContext)
+ throws ACIDException {
internalUnlock(datasetId, entityHashValue, txnContext, false);
}
@@ -2010,7 +2011,7 @@ public class LockManager implements ILockManager, ILifeCycleComponent {
Iterator<Entry<JobId, JobInfo>> iter = jobHT.entrySet().iterator();
while (iter.hasNext()) {
- Map.Entry<JobId, JobInfo> pair = (Map.Entry<JobId, JobInfo>) iter.next();
+ Map.Entry<JobId, JobInfo> pair = iter.next();
jobInfo = pair.getValue();
waiterObjId = jobInfo.getFirstWaitingResource();
while (waiterObjId != -1) {
@@ -2215,7 +2216,7 @@ public class LockManager implements ILockManager, ILifeCycleComponent {
tempJobIdObj.setId(logRecord.getJobId());
txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(tempJobIdObj, false);
txnCtx.notifyOptracker(true);
- ((LogPage) logPage).notifyJobTerminator();
+ logPage.notifyJobTerminator();
}
logRecord = logPageReader.next();
}
@@ -2281,4 +2282,4 @@ class ConsecutiveWakeupContext {
}
-}
+}
\ No newline at end of file