You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by al...@apache.org on 2023/04/04 01:22:15 UTC
[asterixdb] branch master updated: [ASTERIXDB-3144][RT] Make commit runtime support multiple partitions
This is an automated email from the ASF dual-hosted git repository.
alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new dfcb99f094 [ASTERIXDB-3144][RT] Make commit runtime support multiple partitions
dfcb99f094 is described below
commit dfcb99f094291b511fe51c0890cf7fd7ee85fafe
Author: Ali Alsuliman <al...@gmail.com>
AuthorDate: Sun Apr 2 01:33:26 2023 -0700
[ASTERIXDB-3144][RT] Make commit runtime support multiple partitions
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
This patch changes the commit runtime to support
operating on multiple partitions. With this change, a commit push
runtime will handle processing data belonging to multiple partitions.
This is a step towards achieving compute/storage separation.
Change-Id: Ia06f125465d667331943e3ed132f81e6cf77be71
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17458
Reviewed-by: Murtadha Hubail <mh...@apache.org>
Tested-by: Murtadha Hubail <mh...@apache.org>
---
.../asterix/app/bootstrap/TestNodeController.java | 2 +-
.../apache/asterix/test/dataflow/TestDataset.java | 7 ++++---
.../apache/asterix/metadata/entities/Dataset.java | 7 ++++++-
.../management/runtime/CommitRuntime.java | 20 ++++++++++++++++----
.../management/runtime/CommitRuntimeFactory.java | 10 +++++++---
5 files changed, 34 insertions(+), 12 deletions(-)
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index a65c898acb..068c0ee18f 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -855,7 +855,7 @@ public class TestNodeController {
pkFieldsInCommitOp[i] = start++;
}
CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(ctx), dataset.getDatasetId(), pkFieldsInCommitOp,
- true, ctx.getTaskAttemptId().getTaskId().getPartition(), true);
+ true, ctx.getTaskAttemptId().getTaskId().getPartition(), true, null, null);
insertOp.setOutputFrameWriter(0, commitOp, upsertOutRecDesc);
commitOp.setInputRecordDescriptor(0, upsertOutRecDesc);
return Pair.of(insertOp, commitOp);
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
index f87757c389..f684405e74 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestDataset.java
@@ -58,11 +58,12 @@ public class TestDataset extends Dataset {
public IPushRuntimeFactory getCommitRuntimeFactory(MetadataProvider metadataProvider, int[] keyFieldPermutation,
boolean isSink) throws AlgebricksException {
return new IPushRuntimeFactory() {
+
@Override
public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException {
- return new IPushRuntime[] {
- new CommitRuntime(ctx, new TxnId(ctx.getJobletContext().getJobId().getId()), getDatasetId(),
- keyFieldPermutation, true, ctx.getTaskAttemptId().getTaskId().getPartition(), true) };
+ return new IPushRuntime[] { new CommitRuntime(ctx, new TxnId(ctx.getJobletContext().getJobId().getId()),
+ getDatasetId(), keyFieldPermutation, true, ctx.getTaskAttemptId().getTaskId().getPartition(),
+ true, null, null) };
}
};
}
diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index c0f2dddff4..4b225ecde7 100644
--- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -99,11 +99,13 @@ import org.apache.hyracks.api.client.IHyracksClientConnection;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionerFactory;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
@@ -631,8 +633,11 @@ public class Dataset implements IMetadataEntity<Dataset>, IDataset {
public IPushRuntimeFactory getCommitRuntimeFactory(MetadataProvider metadataProvider,
int[] primaryKeyFieldPermutation, boolean isSink) throws AlgebricksException {
int[] datasetPartitions = getDatasetPartitions(metadataProvider);
+ IBinaryHashFunctionFactory[] pkHashFunFactories = getPrimaryHashFunctionFactories(metadataProvider);
+ ITuplePartitionerFactory partitionerFactory = new FieldHashPartitionerFactory(primaryKeyFieldPermutation,
+ pkHashFunFactories, datasetPartitions.length);
return new CommitRuntimeFactory(datasetId, primaryKeyFieldPermutation, metadataProvider.isWriteTransaction(),
- datasetPartitions, isSink);
+ datasetPartitions, isSink, partitionerFactory);
}
public IFrameOperationCallbackFactory getFrameOpCallbackFactory(MetadataProvider mdProvider) {
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
index 2692cc7b33..ff5ed5c571 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java
@@ -34,6 +34,8 @@ import org.apache.asterix.common.utils.TransactionUtil;
import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime;
import org.apache.hyracks.api.comm.IFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitioner;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.HyracksConstants;
@@ -56,14 +58,18 @@ public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime
protected final boolean isWriteTransaction;
protected final long[] longHashes;
protected final IHyracksTaskContext ctx;
+ private final int[] datasetPartitions;
+ private final ITuplePartitioner partitioner;
protected final int resourcePartition;
protected ITransactionContext transactionContext;
protected LogRecord logRecord;
protected final boolean isSink;
public CommitRuntime(IHyracksTaskContext ctx, TxnId txnId, int datasetId, int[] primaryKeyFields,
- boolean isWriteTransaction, int resourcePartition, boolean isSink) {
+ boolean isWriteTransaction, int resourcePartition, boolean isSink,
+ ITuplePartitionerFactory partitionerFactory, int[] datasetPartitions) {
this.ctx = ctx;
+ this.datasetPartitions = datasetPartitions;
INcApplicationContext appCtx =
(INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
this.transactionManager = appCtx.getTransactionSubsystem().getTransactionManager();
@@ -75,6 +81,7 @@ public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime
this.isWriteTransaction = isWriteTransaction;
this.resourcePartition = resourcePartition;
this.isSink = isSink;
+ this.partitioner = partitionerFactory != null ? partitionerFactory.createPartitioner(ctx) : null;
longHashes = new long[2];
}
@@ -102,7 +109,7 @@ public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime
for (int t = 0; t < nTuple; t++) {
tRef.reset(tAccess, t);
try {
- formLogRecord(buffer, t);
+ formLogRecord(tAccess, t);
logMgr.log(logRecord);
if (!isSink) {
appendTupleToFrame(t);
@@ -130,10 +137,11 @@ public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime
TransactionUtil.formMarkerLogRecord(logRecord, transactionContext, datasetId, resourcePartition, marker);
}
- protected void formLogRecord(ByteBuffer buffer, int t) {
+ protected void formLogRecord(FrameTupleAccessor accessor, int t) throws HyracksDataException {
int pkHash = computePrimaryKeyHashValue(tRef, primaryKeyFields);
+ int resource = getResourcePartition(accessor, t);
TransactionUtil.formEntityCommitLogRecord(logRecord, transactionContext, datasetId, pkHash, tRef,
- primaryKeyFields, resourcePartition, LogType.ENTITY_COMMIT);
+ primaryKeyFields, resource, LogType.ENTITY_COMMIT);
}
protected int computePrimaryKeyHashValue(ITupleReference tuple, int[] primaryKeyFields) {
@@ -141,6 +149,10 @@ public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime
return Math.abs((int) longHashes[0]);
}
+ protected int getResourcePartition(FrameTupleAccessor tupleAccessor, int tuple) throws HyracksDataException {
+ return partitioner != null ? datasetPartitions[partitioner.partition(tupleAccessor, tuple)] : resourcePartition;
+ }
+
@Override
public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) {
this.inputRecordDesc = recordDescriptor;
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
index 708e8dc4c7..269f6862c9 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java
@@ -23,13 +23,15 @@ import org.apache.asterix.common.api.IJobEventListenerFactory;
import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
import org.apache.hyracks.algebricks.runtime.operators.base.AbstractPushRuntimeFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IJobletEventListenerFactory;
public class CommitRuntimeFactory extends AbstractPushRuntimeFactory {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
+ protected final ITuplePartitionerFactory partitionerFactory;
protected final int datasetId;
protected final int[] primaryKeyFields;
protected final boolean isWriteTransaction;
@@ -37,7 +39,8 @@ public class CommitRuntimeFactory extends AbstractPushRuntimeFactory {
protected final boolean isSink;
public CommitRuntimeFactory(int datasetId, int[] primaryKeyFields, boolean isWriteTransaction,
- int[] datasetPartitions, boolean isSink) {
+ int[] datasetPartitions, boolean isSink, ITuplePartitionerFactory partitionerFactory) {
+ this.partitionerFactory = partitionerFactory;
this.datasetId = datasetId;
this.primaryKeyFields = primaryKeyFields;
this.isWriteTransaction = isWriteTransaction;
@@ -55,6 +58,7 @@ public class CommitRuntimeFactory extends AbstractPushRuntimeFactory {
IJobletEventListenerFactory fact = ctx.getJobletContext().getJobletEventListenerFactory();
return new IPushRuntime[] { new CommitRuntime(ctx, ((IJobEventListenerFactory) fact).getTxnId(datasetId),
datasetId, primaryKeyFields, isWriteTransaction,
- datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink) };
+ datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink, partitionerFactory,
+ datasetPartitions) };
}
}