You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/07/22 19:37:01 UTC
[2/4] asterixdb git commit: Add Test NodeController,
Test Data Generator, and Marker Logs
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
index 44aac60..790289a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
@@ -32,8 +32,8 @@ public class FeedWithMetaDataFlowController<T> extends FeedRecordDataFlowControl
public FeedWithMetaDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
FeedLogManager feedLogManager, int numOfOutputFields, IRecordWithMetadataParser<T> dataParser,
- IRecordReader<T> recordReader) throws HyracksDataException {
- super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
+ IRecordReader<T> recordReader, boolean sendMarker) throws HyracksDataException {
+ super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader, sendMarker);
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
index 2e1c83f..6159908 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FeedRuntimeInputHandler.java
@@ -72,11 +72,12 @@ public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperat
IFrameWriter writer, FeedPolicyAccessor fpa, FrameTupleAccessor fta, ConcurrentFramePool framePool)
throws HyracksDataException {
this.writer = writer;
+
this.spiller =
- new FrameSpiller(ctx,
+ fpa.spillToDiskOnCongestion() ? new FrameSpiller(ctx,
connectionId.getFeedId() + "_" + connectionId.getDatasetName() + "_"
+ runtimeId.getFeedRuntimeType() + "_" + runtimeId.getPartition(),
- fpa.getMaxSpillOnDisk());
+ fpa.getMaxSpillOnDisk()) : null;
this.exceptionHandler = new FeedExceptionHandler(ctx, fta);
this.fpa = fpa;
this.framePool = framePool;
@@ -122,7 +123,9 @@ public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperat
LOGGER.log(Level.WARNING, th.getMessage(), th);
}
try {
- spiller.close();
+ if (spiller != null) {
+ spiller.close();
+ }
} catch (Throwable th) {
LOGGER.log(Level.WARNING, th.getMessage(), th);
}
@@ -459,34 +462,34 @@ public class FeedRuntimeInputHandler extends AbstractUnaryInputUnaryOutputOperat
frame = inbox.poll();
if (frame == null) {
// Memory queue is empty. Check spill
- frame = spiller.next();
- while (frame != null) {
- if (consume(frame) != null) {
- // We don't release the frame since this is a spill frame that we didn't get from memory
- // manager
- return;
- }
+ if (spiller != null) {
frame = spiller.next();
+ while (frame != null) {
+ if (consume(frame) != null) {
+ // We don't release the frame since this is a spill frame that we didn't get from memory
+ // manager
+ return;
+ }
+ frame = spiller.next();
+ }
}
writer.flush();
// At this point. We consumed all memory and spilled
// We can't assume the next will be in memory. what if there is 0 memory?
synchronized (mutex) {
frame = inbox.poll();
- if (frame == null) {
- // Nothing in memory
- if (spiller.switchToMemory()) {
- if (poisoned) {
- break;
- }
- if (DEBUG) {
- LOGGER.info("Consumer is going to sleep");
- }
- // Nothing in disk
- mutex.wait();
- if (DEBUG) {
- LOGGER.info("Consumer is waking up");
- }
+ // Nothing in memory
+ if (frame == null && (spiller == null || spiller.switchToMemory())) {
+ if (poisoned) {
+ break;
+ }
+ if (DEBUG) {
+ LOGGER.info("Consumer is going to sleep");
+ }
+ // Nothing in disk
+ mutex.wait();
+ if (DEBUG) {
+ LOGGER.info("Consumer is waking up");
}
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java
index a2f19bb..09e03a3 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/dataflow/FrameSpiller.java
@@ -28,10 +28,10 @@ import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.util.ArrayDeque;
+import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.log4j.Level;
import org.apache.log4j.Logger;
/**
@@ -49,28 +49,30 @@ public class FrameSpiller {
private final String fileNamePrefix;
private final ArrayDeque<File> files = new ArrayDeque<>();
private final VSizeFrame frame;
- private final int budget; // Max current frames in disk allowed
- private BufferedOutputStream bos; // Current output stream
- private BufferedInputStream bis; // Current input stream
- private File currentWriteFile; // Current write file
- private File currentReadFile; // Current read file
- private int currentWriteCount = 0; // Current file write count
- private int currentReadCount = 0; // Current file read count
- private int totalWriteCount = 0; // Total frames spilled
- private int totalReadCount = 0; // Total frames read
- private int fileCount = 0; // How many spill files?
+ private final int budget; // Max current frames in disk allowed
+ private BufferedOutputStream bos; // Current output stream
+ private BufferedInputStream bis; // Current input stream
+ private File currentWriteFile; // Current write file
+ private File currentReadFile; // Current read file
+ private int currentWriteCount = 0; // Current file write count
+ private int currentReadCount = 0; // Current file read count
+ private int totalWriteCount = 0; // Total frames spilled
+ private int totalReadCount = 0; // Total frames read
+ private int fileCount = 0; // How many spill files?
public FrameSpiller(IHyracksTaskContext ctx, String fileNamePrefix, long budgetInBytes)
throws HyracksDataException {
this.frame = new VSizeFrame(ctx);
this.fileNamePrefix = fileNamePrefix;
- this.budget = (int) (budgetInBytes / ctx.getInitialFrameSize());
-
+ this.budget = (int) Math.min(budgetInBytes / ctx.getInitialFrameSize(), Integer.MAX_VALUE);
+ if (budget <= 0) {
+ throw new HyracksDataException("Invalid budget " + budgetInBytes + ". Budget must be larger than 0");
+ }
}
public void open() throws HyracksDataException {
try {
- this.currentWriteFile = createFile();
+ this.currentWriteFile = StoragePathUtil.createFile(fileNamePrefix, fileCount++);
this.currentReadFile = currentWriteFile;
this.bos = new BufferedOutputStream(new FileOutputStream(currentWriteFile));
this.bis = new BufferedInputStream(new FileInputStream(currentReadFile));
@@ -135,7 +137,7 @@ public class FrameSpiller {
}
public double usedBudget() {
- return ((double) (totalWriteCount - totalReadCount) / (double) budget);
+ return (double) (totalWriteCount - totalReadCount) / (double) budget;
}
public synchronized boolean spill(ByteBuffer frame) throws HyracksDataException {
@@ -150,7 +152,7 @@ public class FrameSpiller {
if (currentWriteCount >= FRAMES_PER_FILE) {
bos.close();
currentWriteCount = 0;
- currentWriteFile = createFile();
+ currentWriteFile = StoragePathUtil.createFile(fileNamePrefix, fileCount++);
files.add(currentWriteFile);
bos = new BufferedOutputStream(new FileOutputStream(currentWriteFile));
}
@@ -161,26 +163,6 @@ public class FrameSpiller {
}
}
- private File createFile() throws HyracksDataException {
- try {
- String fileName = fileNamePrefix + "_" + fileCount++;
- File file = new File(fileName);
- if (!file.exists()) {
- boolean success = file.createNewFile();
- if (!success) {
- throw new HyracksDataException("Unable to create spill file " + fileName);
- } else {
- if (LOGGER.isEnabledFor(Level.INFO)) {
- LOGGER.info("Created spill file " + file.getAbsolutePath());
- }
- }
- }
- return file;
- } catch (Throwable th) {
- throw new HyracksDataException(th);
- }
- }
-
public synchronized void close() {
// Do proper cleanup
if (bos != null) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
index 8ee3e2b..9661890 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/IngestionRuntime.java
@@ -28,6 +28,8 @@ import org.apache.asterix.external.feed.dataflow.FeedFrameCollector;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
public class IngestionRuntime extends SubscribableRuntime {
@@ -48,8 +50,9 @@ public class IngestionRuntime extends SubscribableRuntime {
dWriter.subscribe(collector);
subscribers.add(collectionRuntime);
if (numSubscribers == 0) {
- ctx.setSharedObject(new VSizeFrame(ctx));
- collectionRuntime.getCtx().setSharedObject(ctx.getSharedObject());
+ TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, new VSizeFrame(ctx), ctx);
+ TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE,
+ TaskUtils.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx), collectionRuntime.getCtx());
adapterRuntimeManager.start();
}
numSubscribers++;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
index 54e17ef..37a42a7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaComputeNodePushable.java
@@ -40,7 +40,9 @@ import org.apache.hyracks.api.dataflow.IActivity;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
/*
@@ -112,7 +114,7 @@ public class FeedMetaComputeNodePushable extends AbstractUnaryInputUnaryOutputOp
this.feedManager = (ActiveManager) ((IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext()
.getApplicationObject()).getFeedManager();
this.message = new VSizeFrame(ctx);
- ctx.setSharedObject(message);
+ TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
this.opDesc = feedMetaOperatorDescriptor;
this.recordDescProvider = recordDescProvider;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
index 6f679f7..95bebad 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/FeedMetaStoreNodePushable.java
@@ -41,7 +41,9 @@ import org.apache.hyracks.api.dataflow.IActivity;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
import org.apache.hyracks.dataflow.std.base.AbstractUnaryInputUnaryOutputOperatorNodePushable;
public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOperatorNodePushable {
@@ -107,7 +109,7 @@ public class FeedMetaStoreNodePushable extends AbstractUnaryInputUnaryOutputOper
.getApplicationObject()).getFeedManager();
this.targetId = targetId;
this.message = new VSizeFrame(ctx);
- ctx.setSharedObject(message);
+ TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
this.recordDescProvider = recordDescProvider;
this.opDesc = feedMetaOperatorDescriptor;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
index 4ad08b3..98cb4b0 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
@@ -59,7 +59,7 @@ public class DataflowControllerProvider {
public static IDataFlowController getDataflowController(ARecordType recordType, IHyracksTaskContext ctx,
int partition, IExternalDataSourceFactory dataSourceFactory, IDataParserFactory dataParserFactory,
Map<String, String> configuration, boolean indexingOp, boolean isFeed, FeedLogManager feedLogManager)
- throws HyracksDataException {
+ throws HyracksDataException {
try {
switch (dataSourceFactory.getDataSourceType()) {
case RECORDS:
@@ -67,6 +67,7 @@ public class DataflowControllerProvider {
IRecordReader<?> recordReader = recordReaderFactory.createRecordReader(ctx, partition);
IRecordDataParserFactory<?> recordParserFactory = (IRecordDataParserFactory<?>) dataParserFactory;
IRecordDataParser<?> dataParser = recordParserFactory.createRecordParser(ctx);
+ boolean sendMarker = ExternalDataUtils.isSendMarker(configuration);
if (indexingOp) {
return new IndexingDataFlowController(ctx,
DataflowUtils.getTupleForwarder(configuration, feedLogManager), dataParser,
@@ -80,18 +81,19 @@ public class DataflowControllerProvider {
if (isChangeFeed) {
int numOfKeys = ExternalDataUtils.getNumberOfKeys(configuration);
return new ChangeFeedWithMetaDataFlowController(ctx, tupleForwarder, feedLogManager,
- numOfKeys + 2, (IRecordWithMetadataParser) dataParser, recordReader);
+ numOfKeys + 2, (IRecordWithMetadataParser) dataParser, recordReader,
+ sendMarker);
} else {
return new FeedWithMetaDataFlowController(ctx, tupleForwarder, feedLogManager, 2,
- (IRecordWithMetadataParser) dataParser, recordReader);
+ (IRecordWithMetadataParser) dataParser, recordReader, sendMarker);
}
} else if (isChangeFeed) {
int numOfKeys = ExternalDataUtils.getNumberOfKeys(configuration);
return new ChangeFeedDataFlowController(ctx, tupleForwarder, feedLogManager, numOfKeys + 1,
- (IRecordWithPKDataParser) dataParser, recordReader);
+ (IRecordWithPKDataParser) dataParser, recordReader, sendMarker);
} else {
return new FeedRecordDataFlowController(ctx, tupleForwarder, feedLogManager, 1, dataParser,
- recordReader);
+ recordReader, sendMarker);
}
} else {
return new RecordDataFlowController(ctx,
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
index ad945f2..ed811ad 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/DataflowUtils.java
@@ -30,6 +30,7 @@ import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
public class DataflowUtils {
public static void addTupleToFrame(FrameTupleAppender appender, ArrayTupleBuilder tb, IFrameWriter writer)
@@ -67,4 +68,14 @@ public class DataflowUtils {
throw new HyracksDataException("Unknown tuple forward policy");
}
}
+
+ public static void addTupleToFrame(FrameTupleAppender appender, ITupleReference tuple, IFrameWriter writer)
+ throws HyracksDataException {
+ if (!appender.append(tuple)) {
+ appender.write(writer, true);
+ if (!appender.append(tuple)) {
+ throw new HyracksDataException("Tuple is too large for a frame");
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 55dee04..e251f32 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -46,6 +46,8 @@ public class ExternalDataConstants {
public static final String KEY_FILESYSTEM = "fs";
// specifies the address of the HDFS name node
public static final String KEY_HADOOP_FILESYSTEM_URI = "fs.defaultFS";
+ // specifies whether a feed sends progress markers or not
+ public static final String KEY_SEND_MARKER = "send-marker";
// specifies the class implementation of the accessed instance of HDFS
public static final String KEY_HADOOP_FILESYSTEM_CLASS = "fs.hdfs.impl";
public static final String KEY_HADOOP_INPUT_DIR = "mapred.input.dir";
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 19781f9..23cd39c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -332,4 +332,8 @@ public class ExternalDataUtils {
}
return intIndicators;
}
+
+ public static boolean isSendMarker(Map<String, String> configuration) {
+ return Boolean.parseBoolean(configuration.get(ExternalDataConstants.KEY_SEND_MARKER));
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
index 6b7eb31..6e1b9e8 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
@@ -64,6 +64,9 @@ public class FeedUtils {
SPILL, // Memory budget has been consumed. Now we're writing to disk
DISCARD // Memory and Disk space budgets have been consumed. Now we're discarding
}
+
+ private FeedUtils() {
+ }
private static String prepareDataverseFeedName(String dataverseName, String feedName) {
return dataverseName + File.separator + feedName;
@@ -87,7 +90,7 @@ public class FeedUtils {
throw new AsterixException("Can't create file splits for adapter with count partitioning constraints");
}
String[] locations = ((AlgebricksAbsolutePartitionConstraint) partitionConstraints).getLocations();
- List<FileSplit> splits = new ArrayList<FileSplit>();
+ List<FileSplit> splits = new ArrayList<>();
for (String nd : locations) {
splits.add(splitsForAdapter(dataverseName, feedName, nd,
AsterixClusterProperties.INSTANCE.getNodePartitions(nd)[0]));
@@ -120,6 +123,7 @@ public class FeedUtils {
int offset = fta.getTupleStartOffset(tc);
int len = fta.getTupleLength(tc);
int newSize = FrameHelper.calcAlignedFrameSizeToStore(1, len, message.getMinSize());
+ message.reset();
message.ensureFrameSize(newSize);
message.getBuffer().clear();
message.getBuffer().put(input.array(), offset, len);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateDataverseStatement.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateDataverseStatement.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateDataverseStatement.java
index 820ae5f..7a5b722 100644
--- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateDataverseStatement.java
+++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/CreateDataverseStatement.java
@@ -22,6 +22,7 @@ import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.lang.common.base.Statement;
import org.apache.asterix.lang.common.struct.Identifier;
import org.apache.asterix.lang.common.visitor.base.ILangVisitor;
+import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
public class CreateDataverseStatement extends Statement {
@@ -31,11 +32,7 @@ public class CreateDataverseStatement extends Statement {
public CreateDataverseStatement(Identifier dataverseName, String format, boolean ifNotExists) {
this.dataverseName = dataverseName;
- if (format == null) {
- this.format = "org.apache.asterix.runtime.formats.NonTaggedDataFormat";
- } else {
- this.format = format;
- }
+ this.format = (format == null) ? NonTaggedDataFormat.class.getName() : format;
this.ifNotExists = ifNotExists;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AInt64SerializerDeserializer.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AInt64SerializerDeserializer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AInt64SerializerDeserializer.java
index 8738c97..000339f 100644
--- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AInt64SerializerDeserializer.java
+++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AInt64SerializerDeserializer.java
@@ -58,7 +58,7 @@ public class AInt64SerializerDeserializer implements ISerializerDeserializer<AIn
return (((long) (bytes[offset] & 0xff)) << 56) + (((long) (bytes[offset + 1] & 0xff)) << 48)
+ (((long) (bytes[offset + 2] & 0xff)) << 40) + (((long) (bytes[offset + 3] & 0xff)) << 32)
+ (((long) (bytes[offset + 4] & 0xff)) << 24) + (((long) (bytes[offset + 5] & 0xff)) << 16)
- + (((long) (bytes[offset + 6] & 0xff)) << 8) + (((long) (bytes[offset + 7] & 0xff)) << 0);
+ + (((long) (bytes[offset + 6] & 0xff)) << 8) + (bytes[offset + 7] & 0xff);
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
index cabfc77..d247490 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/management/ReplicationChannel.java
@@ -119,8 +119,9 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
lsmComponentLSNMappingService = new LSMComponentsSyncService();
replicationNotifier = new ReplicationNotifier();
replicationThreads = Executors.newCachedThreadPool(appContext.getThreadFactory());
- Map<String, ClusterPartition[]> nodePartitions = ((IAsterixPropertiesProvider) asterixAppRuntimeContextProvider
- .getAppContext()).getMetadataProperties().getNodePartitions();
+ Map<String, ClusterPartition[]> nodePartitions =
+ ((IAsterixPropertiesProvider) asterixAppRuntimeContextProvider.getAppContext()).getMetadataProperties()
+ .getNodePartitions();
Set<String> nodeReplicationClients = replicationProperties.getNodeReplicationClients(nodeId);
List<Integer> clientsPartitions = new ArrayList<>();
for (String clientId : nodeReplicationClients) {
@@ -141,8 +142,8 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
try {
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(true);
- InetSocketAddress replicationChannelAddress = new InetSocketAddress(InetAddress.getByName(nodeIP),
- dataPort);
+ InetSocketAddress replicationChannelAddress =
+ new InetSocketAddress(InetAddress.getByName(nodeIP), dataPort);
serverSocketChannel.socket().bind(replicationChannelAddress);
lsmComponentLSNMappingService.start();
replicationNotifier.start();
@@ -169,8 +170,9 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
if (remainingFile == 0) {
if (lsmCompProp.getOpType() == LSMOperationType.FLUSH && lsmCompProp.getReplicaLSN() != null
&& replicaUniqueLSN2RemoteMapping.containsKey(lsmCompProp.getNodeUniqueLSN())) {
- int remainingIndexes = replicaUniqueLSN2RemoteMapping
- .get(lsmCompProp.getNodeUniqueLSN()).numOfFlushedIndexes.decrementAndGet();
+ int remainingIndexes =
+ replicaUniqueLSN2RemoteMapping.get(lsmCompProp.getNodeUniqueLSN()).numOfFlushedIndexes
+ .decrementAndGet();
if (remainingIndexes == 0) {
/**
* Note: there is a chance that this will never be removed because some
@@ -216,8 +218,8 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
public void run() {
Thread.currentThread().setName("Replication Thread");
try {
- ReplicationRequestType replicationFunction = ReplicationProtocol.getRequestType(socketChannel,
- inBuffer);
+ ReplicationRequestType replicationFunction =
+ ReplicationProtocol.getRequestType(socketChannel, inBuffer);
while (replicationFunction != ReplicationRequestType.GOODBYE) {
switch (replicationFunction) {
case REPLICATE_LOG:
@@ -281,8 +283,8 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
Set<Integer> datasetsToForceFlush = new HashSet<>();
for (IndexInfo iInfo : openIndexesInfo) {
if (requestedIndexesToBeFlushed.contains(iInfo.getResourceId())) {
- AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) iInfo.getIndex()
- .getIOOperationCallback();
+ AbstractLSMIOOperationCallback ioCallback =
+ (AbstractLSMIOOperationCallback) iInfo.getIndex().getIOOperationCallback();
//if an index has a pending flush, then the request to flush it will succeed.
if (ioCallback.hasPendingFlush()) {
//remove index to indicate that it will be flushed
@@ -373,8 +375,9 @@ public class ReplicationChannel extends Thread implements IReplicationChannel {
List<String> filesList;
Set<String> replicaIds = request.getReplicaIds();
Set<String> requesterExistingFiles = request.getExistingFiles();
- Map<String, ClusterPartition[]> nodePartitions = ((IAsterixPropertiesProvider) appContextProvider
- .getAppContext()).getMetadataProperties().getNodePartitions();
+ Map<String, ClusterPartition[]> nodePartitions =
+ ((IAsterixPropertiesProvider) appContextProvider.getAppContext()).getMetadataProperties()
+ .getNodePartitions();
for (String replicaId : replicaIds) {
//get replica partitions
ClusterPartition[] replicaPatitions = nodePartitions.get(replicaId);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
index 857f1e2..afd6019 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/AsterixLSMPrimaryUpsertOperatorNodePushable.java
@@ -25,6 +25,8 @@ import java.nio.ByteBuffer;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
import org.apache.asterix.common.dataflow.AsterixLSMIndexUtil;
import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.transactions.ILogMarkerCallback;
+import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback;
import org.apache.asterix.om.pointables.nonvisitor.ARecordPointable;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.ATypeTag;
@@ -41,6 +43,7 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
import org.apache.hyracks.storage.am.btree.util.BTreeUtils;
import org.apache.hyracks.storage.am.common.api.IIndexCursor;
@@ -114,8 +117,11 @@ public class AsterixLSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertU
writer.open();
indexHelper.open();
index = indexHelper.getIndexInstance();
-
try {
+ if (ctx.getSharedObject() != null) {
+ PrimaryIndexLogMarkerCallback callback = new PrimaryIndexLogMarkerCallback((AbstractLSMIndex) index);
+ TaskUtils.putInSharedMap(ILogMarkerCallback.KEY_MARKER_CALLBACK, callback, ctx);
+ }
missingTupleBuilder = new ArrayTupleBuilder(1);
DataOutput out = missingTupleBuilder.getDataOutput();
try {
@@ -135,8 +141,8 @@ public class AsterixLSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertU
.createSearchOperationCallback(indexHelper.getResourceID(), ctx, this));
cursor = indexAccessor.createSearchCursor(false);
frameTuple = new FrameTupleReference();
- IAsterixAppRuntimeContext runtimeCtx = (IAsterixAppRuntimeContext) ctx.getJobletContext()
- .getApplicationContext().getApplicationObject();
+ IAsterixAppRuntimeContext runtimeCtx =
+ (IAsterixAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject();
AsterixLSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index,
runtimeCtx.getTransactionSubsystem().getLogManager());
} catch (Exception e) {
@@ -241,10 +247,7 @@ public class AsterixLSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertU
writeOutput(i, recordWasInserted);
i++;
}
- if (tupleCount > 0) {
- // All tuples has to move forward to maintain the correctness of the transaction pipeline
- appender.write(writer, true);
- }
+ appender.write(writer, true);
} catch (IndexException | IOException | AsterixException e) {
throw new HyracksDataException(e);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index 94d2a8c..9a66aa5 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -80,8 +80,8 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
public LogManager(TransactionSubsystem txnSubsystem) {
this.txnSubsystem = txnSubsystem;
- logManagerProperties = new LogManagerProperties(this.txnSubsystem.getTransactionProperties(),
- this.txnSubsystem.getId());
+ logManagerProperties =
+ new LogManagerProperties(this.txnSubsystem.getTransactionProperties(), this.txnSubsystem.getId());
logFileSize = logManagerProperties.getLogPartitionSize();
logPageSize = logManagerProperties.getLogPageSize();
numLogPages = logManagerProperties.getNumLogPages();
@@ -172,6 +172,9 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
if (logRecord.getLogType() == LogType.FLUSH) {
logRecord.setLSN(appendLSN.get());
}
+ if (logRecord.isMarker()) {
+ logRecord.logAppended(appendLSN.get());
+ }
appendLSN.addAndGet(logRecord.getLogSize());
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
index afb926b..0183b29 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -92,7 +92,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
private final long SHARP_CHECKPOINT_LSN = -1;
private final boolean replicationEnabled;
public static final long NON_SHARP_CHECKPOINT_TARGET_LSN = -1;
- private final static String RECOVERY_FILES_DIR_NAME = "recovery_temp";
+ private static final String RECOVERY_FILES_DIR_NAME = "recovery_temp";
private Map<Integer, JobEntityCommits> jobId2WinnerEntitiesMap = null;
private final long cachedEntityCommitsPerJobSize;
private final PersistentLocalResourceRepository localResourceRepository;
@@ -108,8 +108,8 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
this.txnSubsystem = txnSubsystem;
logMgr = (LogManager) txnSubsystem.getLogManager();
checkpointHistory = txnSubsystem.getTransactionProperties().getCheckpointHistory();
- IAsterixPropertiesProvider propertiesProvider = (IAsterixPropertiesProvider) txnSubsystem
- .getAsterixAppRuntimeContextProvider().getAppContext();
+ IAsterixPropertiesProvider propertiesProvider =
+ (IAsterixPropertiesProvider) txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext();
replicationEnabled = propertiesProvider.getReplicationProperties().isReplicationEnabled();
localResourceRepository = (PersistentLocalResourceRepository) txnSubsystem.getAsterixAppRuntimeContextProvider()
.getLocalResourceRepository();
@@ -271,6 +271,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
break;
case LogType.FLUSH:
case LogType.WAIT:
+ case LogType.MARKER:
break;
default:
throw new ACIDException("Unsupported LogType: " + logRecord.getLogType());
@@ -361,10 +362,10 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
//if index is not registered into IndexLifeCycleManager,
//create the index using LocalMetadata stored in LocalResourceRepository
//get partition path in this node
- String partitionIODevicePath = localResourceRepository
- .getPartitionPath(localResource.getPartition());
- String resourceAbsolutePath = partitionIODevicePath + File.separator
- + localResource.getResourceName();
+ String partitionIODevicePath =
+ localResourceRepository.getPartitionPath(localResource.getPartition());
+ String resourceAbsolutePath =
+ partitionIODevicePath + File.separator + localResource.getResourceName();
localResource.setResourcePath(resourceAbsolutePath);
index = (ILSMIndex) datasetLifecycleManager.getIndex(resourceAbsolutePath);
if (index == null) {
@@ -379,8 +380,8 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
//#. get maxDiskLastLSN
ILSMIndex lsmIndex = index;
try {
- maxDiskLastLsn = ((AbstractLSMIOOperationCallback) lsmIndex
- .getIOOperationCallback())
+ maxDiskLastLsn =
+ ((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback())
.getComponentLSN(lsmIndex.getImmutableComponents());
} catch (HyracksDataException e) {
datasetLifecycleManager.close(resourceAbsolutePath);
@@ -405,6 +406,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
case LogType.ABORT:
case LogType.FLUSH:
case LogType.UPSERT_ENTITY_COMMIT:
+ case LogType.MARKER:
//do nothing
break;
default:
@@ -443,8 +445,8 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
//right after the new checkpoint file is written.
File[] prevCheckpointFiles = getPreviousCheckpointFiles();
- IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
- .getDatasetLifecycleManager();
+ IDatasetLifecycleManager datasetLifecycleManager =
+ txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
//flush all in-memory components if it is the sharp checkpoint
if (isSharpCheckpoint) {
datasetLifecycleManager.flushAllDatasets();
@@ -467,8 +469,8 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
Set<Integer> deadReplicasPartitions = new HashSet<>();
//get partitions of the dead replicas that are not active on this node
for (String deadReplicaId : deadReplicaIds) {
- ClusterPartition[] nodePartitons = metadataProperties.getNodePartitions()
- .get(deadReplicaId);
+ ClusterPartition[] nodePartitons =
+ metadataProperties.getNodePartitions().get(deadReplicaId);
for (ClusterPartition partition : nodePartitons) {
if (!localResourceRepository.getActivePartitions()
.contains(partition.getPartitionId())) {
@@ -492,8 +494,8 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
datasetLifecycleManager.scheduleAsyncFlushForLaggingDatasets(nonSharpCheckpointTargetLSN);
if (replicationEnabled) {
//request remote replicas to flush lagging indexes
- IReplicationManager replicationManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
- .getAppContext().getReplicationManager();
+ IReplicationManager replicationManager =
+ txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().getReplicationManager();
try {
replicationManager.requestFlushLaggingReplicaIndexes(nonSharpCheckpointTargetLSN);
} catch (IOException e) {
@@ -564,16 +566,16 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
@Override
public long getLocalMinFirstLSN() throws HyracksDataException {
- IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
- .getDatasetLifecycleManager();
+ IDatasetLifecycleManager datasetLifecycleManager =
+ txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
List<IIndex> openIndexList = datasetLifecycleManager.getOpenIndexes();
long firstLSN;
//the min first lsn can only be the current append or smaller
long minFirstLSN = logMgr.getAppendLSN();
if (openIndexList.size() > 0) {
for (IIndex index : openIndexList) {
- AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) ((ILSMIndex) index)
- .getIOOperationCallback();
+ AbstractLSMIOOperationCallback ioCallback =
+ (AbstractLSMIOOperationCallback) ((ILSMIndex) index).getIOOperationCallback();
if (!((AbstractLSMIndex) index).isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()) {
firstLSN = ioCallback.getFirstLSN();
@@ -585,8 +587,8 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
}
private long getRemoteMinFirstLSN() {
- IReplicaResourcesManager remoteResourcesManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
- .getAppContext().getReplicaResourcesManager();
+ IReplicaResourcesManager remoteResourcesManager =
+ txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().getReplicaResourcesManager();
long minRemoteLSN = remoteResourcesManager.getPartitionsMinLSN(localResourceRepository.getInactivePartitions());
return minRemoteLSN;
}
@@ -783,6 +785,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
case LogType.ABORT:
case LogType.FLUSH:
case LogType.WAIT:
+ case LogType.MARKER:
//ignore
break;
default:
@@ -798,8 +801,8 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
//undo loserTxn's effect
LOGGER.log(Level.INFO, "undoing loser transaction's effect");
- IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
- .getDatasetLifecycleManager();
+ IDatasetLifecycleManager datasetLifecycleManager =
+ txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
//TODO sort loser entities by smallest LSN to undo in one pass.
Iterator<Entry<TxnId, List<Long>>> iter = jobLoserEntity2LSNsMap.entrySet().iterator();
int undoCount = 0;
@@ -855,10 +858,10 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
private static void undo(ILogRecord logRecord, IDatasetLifecycleManager datasetLifecycleManager) {
try {
- ILSMIndex index = (ILSMIndex) datasetLifecycleManager.getIndex(logRecord.getDatasetId(),
- logRecord.getResourceId());
- ILSMIndexAccessor indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
- NoOpOperationCallback.INSTANCE);
+ ILSMIndex index =
+ (ILSMIndex) datasetLifecycleManager.getIndex(logRecord.getDatasetId(), logRecord.getResourceId());
+ ILSMIndexAccessor indexAccessor =
+ index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) {
indexAccessor.forceDelete(logRecord.getNewValue());
} else if (logRecord.getNewOp() == IndexOperation.DELETE.ordinal()) {
@@ -873,10 +876,10 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
private static void redo(ILogRecord logRecord, IDatasetLifecycleManager datasetLifecycleManager) {
try {
- ILSMIndex index = (ILSMIndex) datasetLifecycleManager.getIndex(logRecord.getDatasetId(),
- logRecord.getResourceId());
- ILSMIndexAccessor indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE,
- NoOpOperationCallback.INSTANCE);
+ ILSMIndex index =
+ (ILSMIndex) datasetLifecycleManager.getIndex(logRecord.getDatasetId(), logRecord.getResourceId());
+ ILSMIndexAccessor indexAccessor =
+ index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) {
indexAccessor.forceInsert(logRecord.getNewValue());
} else if (logRecord.getNewOp() == IndexOperation.DELETE.ordinal()) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java
index d94f933..19d7afb 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameWriter.java
@@ -64,6 +64,7 @@ public interface IFrameWriter {
/**
* Provide data to the stream of this {@link IFrameWriter}.
+ *
* @param buffer
* - Buffer containing data.
* @throws HyracksDataException
@@ -72,21 +73,24 @@ public interface IFrameWriter {
/**
* request the frame to push its content forward and flush its consumers
+ *
* @throws HyracksDataException
*/
public default void flush() throws HyracksDataException {
- throw new HyracksDataException("flush() is not supported in this IFrameWriter");
+ // No Op
}
/**
* Indicate that a failure was encountered and the current stream is to be
* aborted.
+ *
* @throws HyracksDataException
*/
public void fail() throws HyracksDataException;
/**
* Close this {@link IFrameWriter} and give up all resources.
+ *
* @throws HyracksDataException
*/
public void close() throws HyracksDataException;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
index 3781489..4eb3ebf 100644
--- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/context/IHyracksTaskContext.java
@@ -45,7 +45,7 @@ public interface IHyracksTaskContext
public void sendApplicationMessageToCC(byte[] message, DeploymentId deploymendId) throws Exception;
- public void setSharedObject(Object sharedObject);
+ public void setSharedObject(Object object);
public Object getSharedObject();
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConstants.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConstants.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConstants.java
new file mode 100644
index 0000000..8d55235
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConstants.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.util;
+
+public class HyracksConstants {
+ public static final String KEY_MESSAGE = "HYX:MSG";
+
+ private HyracksConstants() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 2f8def1..43cac74 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -260,7 +260,8 @@ public class NodeControllerService implements IControllerService {
init();
datasetNetworkManager.start();
- IIPCHandle ccIPCHandle = ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort), ncConfig.retries);
+ IIPCHandle ccIPCHandle =
+ ipc.getHandle(new InetSocketAddress(ncConfig.ccHost, ncConfig.ccPort), ncConfig.retries);
this.ccs = new ClusterControllerRemoteProxy(ccIPCHandle);
HeartbeatSchema.GarbageCollectorInfo[] gcInfos = new HeartbeatSchema.GarbageCollectorInfo[gcMXBeans.size()];
for (int i = 0; i < gcInfos.length; ++i) {
@@ -270,12 +271,11 @@ public class NodeControllerService implements IControllerService {
// Use "public" versions of network addresses and ports
NetworkAddress datasetAddress = datasetNetworkManager.getPublicNetworkAddress();
NetworkAddress netAddress = netManager.getPublicNetworkAddress();
- ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress,
- datasetAddress, osMXBean.getName(), osMXBean.getArch(), osMXBean
- .getVersion(), osMXBean.getAvailableProcessors(), runtimeMXBean.getVmName(), runtimeMXBean
- .getVmVersion(), runtimeMXBean.getVmVendor(), runtimeMXBean.getClassPath(), runtimeMXBean
- .getLibraryPath(), runtimeMXBean.getBootClassPath(), runtimeMXBean.getInputArguments(),
- runtimeMXBean.getSystemProperties(), hbSchema));
+ ccs.registerNode(new NodeRegistration(ipc.getSocketAddress(), id, ncConfig, netAddress, datasetAddress,
+ osMXBean.getName(), osMXBean.getArch(), osMXBean.getVersion(), osMXBean.getAvailableProcessors(),
+ runtimeMXBean.getVmName(), runtimeMXBean.getVmVersion(), runtimeMXBean.getVmVendor(),
+ runtimeMXBean.getClassPath(), runtimeMXBean.getLibraryPath(), runtimeMXBean.getBootClassPath(),
+ runtimeMXBean.getInputArguments(), runtimeMXBean.getSystemProperties(), hbSchema));
synchronized (this) {
while (registrationPending) {
@@ -490,7 +490,8 @@ public class NodeControllerService implements IControllerService {
CCNCFunctions.Function fn = (CCNCFunctions.Function) payload;
switch (fn.getFunctionId()) {
case SEND_APPLICATION_MESSAGE: {
- CCNCFunctions.SendApplicationMessageFunction amf = (CCNCFunctions.SendApplicationMessageFunction) fn;
+ CCNCFunctions.SendApplicationMessageFunction amf =
+ (CCNCFunctions.SendApplicationMessageFunction) fn;
queue.schedule(new ApplicationMessageWork(NodeControllerService.this, amf.getMessage(),
amf.getDeploymentId(), amf.getNodeId()));
return;
@@ -515,7 +516,8 @@ public class NodeControllerService implements IControllerService {
}
case REPORT_PARTITION_AVAILABILITY: {
- CCNCFunctions.ReportPartitionAvailabilityFunction rpaf = (CCNCFunctions.ReportPartitionAvailabilityFunction) fn;
+ CCNCFunctions.ReportPartitionAvailabilityFunction rpaf =
+ (CCNCFunctions.ReportPartitionAvailabilityFunction) fn;
queue.schedule(new ReportPartitionAvailabilityWork(NodeControllerService.this,
rpaf.getPartitionId(), rpaf.getNetworkAddress()));
return;
@@ -528,7 +530,8 @@ public class NodeControllerService implements IControllerService {
}
case GET_NODE_CONTROLLERS_INFO_RESPONSE: {
- CCNCFunctions.GetNodeControllersInfoResponseFunction gncirf = (CCNCFunctions.GetNodeControllersInfoResponseFunction) fn;
+ CCNCFunctions.GetNodeControllersInfoResponseFunction gncirf =
+ (CCNCFunctions.GetNodeControllersInfoResponseFunction) fn;
setNodeControllersInfo(gncirf.getNodeControllerInfos());
return;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
index 134154c..f463bfa 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/Task.java
@@ -394,8 +394,8 @@ public class Task implements IHyracksTaskContext, ICounterContext, Runnable {
}
@Override
- public void setSharedObject(Object sharedObject) {
- this.sharedObject = sharedObject;
+ public void setSharedObject(Object object) {
+ this.sharedObject = object;
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
index 7d12296..7d90a8b 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
@@ -106,20 +106,20 @@ public class MaterializingPipelinedPartition implements IFrameWriter, IPartition
ByteBuffer buffer = ctx.allocateFrame();
boolean fail = false;
boolean done = false;
- boolean flush = false;
while (!fail && !done) {
synchronized (MaterializingPipelinedPartition.this) {
- if (flushRequest) {
- flushRequest = false;
- flush = true;
- }
- while (offset >= size && !eos && !failed && !flush) {
+ while (offset >= size && !eos && !failed) {
+ if (flushRequest) {
+ flushRequest = false;
+ writer.flush();
+ }
try {
MaterializingPipelinedPartition.this.wait();
} catch (InterruptedException e) {
throw new HyracksDataException(e);
}
}
+ flushRequest = false;
fail = failed;
done = eos && offset >= size;
}
@@ -134,10 +134,6 @@ public class MaterializingPipelinedPartition implements IFrameWriter, IPartition
offset += readLen;
buffer.flip();
writer.nextFrame(buffer);
- if (flush) {
- writer.flush();
- flush = false;
- }
}
}
}
@@ -213,4 +209,4 @@ public class MaterializingPipelinedPartition implements IFrameWriter, IPartition
flushRequest = true;
notifyAll();
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
index e7131d5..57f8072 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/AbstractFrameAppender.java
@@ -89,9 +89,7 @@ public class AbstractFrameAppender implements IFrameAppender {
@Override
public void write(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException {
getBuffer().clear();
- if (getTupleCount() > 0) {
- outWriter.nextFrame(getBuffer());
- }
+ outWriter.nextFrame(getBuffer());
if (clearFrame) {
frame.reset();
reset(getBuffer(), true);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
index ef11b5b..3ef8b28 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAppender.java
@@ -45,13 +45,13 @@ public class FrameTupleAppender extends AbstractFrameAppender implements IFrameT
* append fieldSlots and bytes to the current frame
*/
@Override
- public boolean append(int[] fieldSlots, byte[] bytes, int offset, int length) throws HyracksDataException {
- if (canHoldNewTuple(fieldSlots.length, length)) {
- for (int i = 0; i < fieldSlots.length; ++i) {
- IntSerDeUtils.putInt(array, tupleDataEndOffset + i * 4, fieldSlots[i]);
+ public boolean append(int[] fieldEndOffsets, byte[] bytes, int offset, int length) throws HyracksDataException {
+ if (canHoldNewTuple(fieldEndOffsets.length, length)) {
+ for (int i = 0; i < fieldEndOffsets.length; ++i) {
+ IntSerDeUtils.putInt(array, tupleDataEndOffset + i * 4, fieldEndOffsets[i]);
}
- System.arraycopy(bytes, offset, array, tupleDataEndOffset + fieldSlots.length * 4, length);
- tupleDataEndOffset += fieldSlots.length * 4 + length;
+ System.arraycopy(bytes, offset, array, tupleDataEndOffset + fieldEndOffsets.length * 4, length);
+ tupleDataEndOffset += fieldEndOffsets.length * 4 + length;
IntSerDeUtils.putInt(getBuffer().array(),
FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
++tupleCount;
@@ -63,19 +63,24 @@ public class FrameTupleAppender extends AbstractFrameAppender implements IFrameT
}
public boolean append(ITupleReference tuple) throws HyracksDataException {
- int tupleSize = 0;
+ int length = 0;
for (int i = 0; i < tuple.getFieldCount(); i++) {
- tupleSize += tuple.getFieldLength(i);
+ length += tuple.getFieldLength(i);
}
- if (canHoldNewTuple(tuple.getFieldCount(), tupleSize)) {
- int offset = 0;
+
+ if (canHoldNewTuple(tuple.getFieldCount(), length)) {
+ length = 0;
+ for (int i = 0; i < tuple.getFieldCount(); ++i) {
+ length += tuple.getFieldLength(i);
+ IntSerDeUtils.putInt(array, tupleDataEndOffset + i * 4, length);
+ }
+ length = 0;
for (int i = 0; i < tuple.getFieldCount(); ++i) {
- IntSerDeUtils.putInt(array, tupleDataEndOffset + i * 4, offset);
System.arraycopy(tuple.getFieldData(i), tuple.getFieldStart(i), array,
- tupleDataEndOffset + tuple.getFieldCount() * 4, tuple.getFieldLength(i));
- offset += tuple.getFieldLength(i);
+ tupleDataEndOffset + tuple.getFieldCount() * 4 + length, tuple.getFieldLength(i));
+ length += tuple.getFieldLength(i);
}
- tupleDataEndOffset += tuple.getFieldCount() * 4 + tupleSize;
+ tupleDataEndOffset += tuple.getFieldCount() * 4 + length;
IntSerDeUtils.putInt(getBuffer().array(),
FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
++tupleCount;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
index cae659d..7f518cd 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
@@ -26,7 +26,9 @@ import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
import org.apache.hyracks.util.IntSerDeUtils;
/**
@@ -39,7 +41,9 @@ public class MessagingFrameTupleAppender extends FrameTupleAppender {
private static final int NULL_MESSAGE_SIZE = 1;
public static final byte NULL_FEED_MESSAGE = 0x01;
public static final byte ACK_REQ_FEED_MESSAGE = 0x02;
- public static final byte SNAPSHOT_MESSAGE = 0x03;
+ public static final byte MARKER_MESSAGE = 0x03;
+ private boolean initialized = false;
+ private VSizeFrame message;
public MessagingFrameTupleAppender(IHyracksTaskContext ctx) {
this.ctx = ctx;
@@ -59,8 +63,8 @@ public class MessagingFrameTupleAppender extends FrameTupleAppender {
case ACK_REQ_FEED_MESSAGE:
aString.append("Ack Request, ");
break;
- case SNAPSHOT_MESSAGE:
- aString.append("Snapshot, ");
+ case MARKER_MESSAGE:
+ aString.append("Marker, ");
break;
default:
aString.append("Unknown, ");
@@ -78,8 +82,8 @@ public class MessagingFrameTupleAppender extends FrameTupleAppender {
return NULL_FEED_MESSAGE;
case ACK_REQ_FEED_MESSAGE:
return ACK_REQ_FEED_MESSAGE;
- case SNAPSHOT_MESSAGE:
- return SNAPSHOT_MESSAGE;
+ case MARKER_MESSAGE:
+ return MARKER_MESSAGE;
default:
throw new HyracksDataException("Unknown message type");
}
@@ -101,24 +105,35 @@ public class MessagingFrameTupleAppender extends FrameTupleAppender {
@Override
public void write(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException {
+ if (!initialized) {
+ message = TaskUtils.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx);
+ initialized = true;
+ }
// If message fits, we append it, otherwise, we append a null message, then send a message only
// frame with the message
- ByteBuffer message = ((VSizeFrame) ctx.getSharedObject()).getBuffer();
- int messageSize = message.limit() - message.position();
- if (hasEnoughSpace(1, messageSize)) {
- appendMessage(message);
- forward(outWriter);
- } else {
+ if (message == null) {
if (tupleCount > 0) {
appendNullMessage();
forward(outWriter);
}
- if (!hasEnoughSpace(1, messageSize)) {
- frame.ensureFrameSize(FrameHelper.calcAlignedFrameSizeToStore(1, messageSize, frame.getMinSize()));
- reset(frame.getBuffer(), true);
+ } else {
+ ByteBuffer buffer = message.getBuffer();
+ int messageSize = buffer.limit() - buffer.position();
+ if (hasEnoughSpace(1, messageSize)) {
+ appendMessage(buffer);
+ forward(outWriter);
+ } else {
+ if (tupleCount > 0) {
+ appendNullMessage();
+ forward(outWriter);
+ }
+ if (!hasEnoughSpace(1, messageSize)) {
+ frame.ensureFrameSize(FrameHelper.calcAlignedFrameSizeToStore(1, messageSize, frame.getMinSize()));
+ reset(frame.getBuffer(), true);
+ }
+ appendMessage(buffer);
+ forward(outWriter);
}
- appendMessage(message);
- forward(outWriter);
}
}
@@ -130,8 +145,9 @@ public class MessagingFrameTupleAppender extends FrameTupleAppender {
}
private void appendMessage(ByteBuffer message) {
- System.arraycopy(message.array(), message.position(), array, tupleDataEndOffset, message.limit());
- tupleDataEndOffset += message.limit();
+ int messageLength = message.limit() - message.position();
+ System.arraycopy(message.array(), message.position(), array, tupleDataEndOffset, messageLength);
+ tupleDataEndOffset += messageLength;
IntSerDeUtils.putInt(getBuffer().array(),
FrameHelper.getTupleCountOffset(frame.getFrameSize()) - 4 * (tupleCount + 1), tupleDataEndOffset);
++tupleCount;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/TaskUtils.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/TaskUtils.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/TaskUtils.java
new file mode 100644
index 0000000..4f27d79
--- /dev/null
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/util/TaskUtils.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.common.util;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+/**
+ * A Utility class for facilitating common operations used with a hyracks task
+ */
+public class TaskUtils {
+ private TaskUtils() {
+ }
+
+ /**
+ * get the shared object of a task as a Map<String,Object>
+ *
+ * @param ctx
+ * the task context
+ * @param create
+ * @return the task shared map
+ */
+ @SuppressWarnings("unchecked")
+ public static Map<String, Object> getSharedMap(IHyracksTaskContext ctx, boolean create) {
+ if (ctx.getSharedObject() != null) {
+ return (Map<String, Object>) ctx.getSharedObject();
+ } else if (create) {
+ Map<String, Object> taskMap = new HashMap<>();
+ ctx.setSharedObject(taskMap);
+ return taskMap;
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * put the key value pair in a map task object
+ *
+ * @param key
+ * @param ctx
+ * @param object
+ */
+ public static void putInSharedMap(String key, Object object, IHyracksTaskContext ctx) {
+ TaskUtils.getSharedMap(ctx, true).put(key, object);
+ }
+
+ /**
+ * get a <T> object from the shared map of the task
+ *
+ * @param key
+ * @param ctx
+ * @return the value associated with the key casted as T
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> T get(String key, IHyracksTaskContext ctx) {
+ Map<String, Object> sharedMap = TaskUtils.getSharedMap(ctx, false);
+ return sharedMap == null ? null : (T) sharedMap.get(key);
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
index 30ee3c0..b4e51be 100644
--- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
+++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/preclustered/PreclusteredGroupWriter.java
@@ -65,8 +65,8 @@ public class PreclusteredGroupWriter implements IFrameWriter {
RecordDescriptor outRecordDesc, IFrameWriter writer) throws HyracksDataException {
this.groupFields = groupFields;
this.comparators = comparators;
- this.aggregator = aggregatorFactory.createAggregator(ctx, inRecordDesc, outRecordDesc, groupFields, groupFields,
- writer);
+ this.aggregator =
+ aggregatorFactory.createAggregator(ctx, inRecordDesc, outRecordDesc, groupFields, groupFields, writer);
this.aggregateState = aggregator.createAggregateStates();
copyFrame = new VSizeFrame(ctx);
inFrameAccessor = new FrameTupleAccessor(inRecordDesc);
@@ -91,29 +91,32 @@ public class PreclusteredGroupWriter implements IFrameWriter {
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
inFrameAccessor.reset(buffer);
int nTuples = inFrameAccessor.getTupleCount();
- for (int i = 0; i < nTuples; ++i) {
- if (first) {
+ if (nTuples != 0) {
+ for (int i = 0; i < nTuples; ++i) {
+ if (first) {
- tupleBuilder.reset();
- for (int j = 0; j < groupFields.length; j++) {
- tupleBuilder.addField(inFrameAccessor, i, groupFields[j]);
- }
- aggregator.init(tupleBuilder, inFrameAccessor, i, aggregateState);
+ tupleBuilder.reset();
+ for (int j = 0; j < groupFields.length; j++) {
+ tupleBuilder.addField(inFrameAccessor, i, groupFields[j]);
+ }
+ aggregator.init(tupleBuilder, inFrameAccessor, i, aggregateState);
- first = false;
+ first = false;
- } else {
- if (i == 0) {
- switchGroupIfRequired(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1, inFrameAccessor, i);
} else {
- switchGroupIfRequired(inFrameAccessor, i - 1, inFrameAccessor, i);
- }
+ if (i == 0) {
+ switchGroupIfRequired(copyFrameAccessor, copyFrameAccessor.getTupleCount() - 1, inFrameAccessor,
+ i);
+ } else {
+ switchGroupIfRequired(inFrameAccessor, i - 1, inFrameAccessor, i);
+ }
+ }
}
+ copyFrame.ensureFrameSize(buffer.capacity());
+ FrameUtils.copyAndFlip(buffer, copyFrame.getBuffer());
+ copyFrameAccessor.reset(copyFrame.getBuffer());
}
- copyFrame.ensureFrameSize(buffer.capacity());
- FrameUtils.copyAndFlip(buffer, copyFrame.getBuffer());
- copyFrameAccessor.reset(copyFrame.getBuffer());
}
private void switchGroupIfRequired(FrameTupleAccessor prevTupleAccessor, int prevTupleIndex,
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/hyracks-fullstack/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/pom.xml
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/pom.xml b/hyracks-fullstack/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/pom.xml
index 7267cb7..7784199 100644
--- a/hyracks-fullstack/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-maven-plugins/hyracks-virtualcluster-maven-plugin/pom.xml
@@ -33,4 +33,35 @@
<root.dir>${basedir}/../../..</root.dir>
</properties>
+ <build>
+ <pluginManagement>
+ <plugins>
+ <!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+ <plugin>
+ <groupId>org.eclipse.m2e</groupId>
+ <artifactId>lifecycle-mapping</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <lifecycleMappingMetadata>
+ <pluginExecutions>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-plugin-plugin</artifactId>
+ <versionRange>[3.3,)</versionRange>
+ <goals>
+ <goal>descriptor</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore></ignore>
+ </action>
+ </pluginExecution>
+ </pluginExecutions>
+ </lifecycleMappingMetadata>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ </build>
</project>