You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2016/07/22 19:37:02 UTC
[3/4] asterixdb git commit: Add Test NodeController,
Test Data Generator, and Marker Logs
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleReference.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleReference.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleReference.java
new file mode 100644
index 0000000..b676dfd
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestTupleReference.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.common;
+
+import org.apache.hyracks.data.std.util.GrowableArray;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+
+public class TestTupleReference implements ITupleReference {
+ private GrowableArray[] fields;
+ private int[] offsets;
+
+ public TestTupleReference(GrowableArray[] fields) {
+ this.fields = fields;
+ offsets = new int[fields.length];
+ }
+
+ public TestTupleReference(int numfields) {
+ this.fields = new GrowableArray[numfields];
+ for (int i = 0; i < numfields; i++) {
+ fields[i] = new GrowableArray();
+ }
+ offsets = new int[fields.length];
+ }
+
+ public GrowableArray[] getFields() {
+ return fields;
+ }
+
+ public void setFields(GrowableArray[] fields) {
+ this.fields = fields;
+ }
+
+ @Override
+ public int getFieldCount() {
+ return fields.length;
+ }
+
+ @Override
+ public byte[] getFieldData(int fIdx) {
+ return fields[fIdx].getByteArray();
+ }
+
+ @Override
+ public int getFieldStart(int fIdx) {
+ return offsets[fIdx];
+ }
+
+ @Override
+ public int getFieldLength(int fIdx) {
+ return fields[fIdx].getLength();
+ }
+
+ public void reset() {
+ for (GrowableArray field : fields) {
+ field.reset();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
index 2f712cc..a253ac0 100644
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ConnectorDescriptorWithMessagingTest.java
@@ -36,6 +36,7 @@ import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IConnectorDescriptorRegistry;
import org.apache.hyracks.api.test.TestFrameWriter;
+import org.apache.hyracks.api.util.HyracksConstants;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -44,6 +45,7 @@ import org.apache.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeser
import org.apache.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
import org.apache.hyracks.dataflow.std.connectors.MToNPartitioningWithMessageConnectorDescriptor;
import org.apache.hyracks.dataflow.std.connectors.PartitionWithMessageDataWriter;
import org.apache.hyracks.test.support.TestUtils;
@@ -70,7 +72,7 @@ public class ConnectorDescriptorWithMessagingTest {
IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
VSizeFrame message = new VSizeFrame(ctx);
VSizeFrame tempBuffer = new VSizeFrame(ctx);
- ctx.setSharedObject(message);
+ TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
message.getBuffer().clear();
message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
message.getBuffer().flip();
@@ -144,8 +146,8 @@ public class ConnectorDescriptorWithMessagingTest {
IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
VSizeFrame message = new VSizeFrame(ctx);
VSizeFrame tempBuffer = new VSizeFrame(ctx);
- ctx.setSharedObject(message);
- writeRandomMessage(message, MessagingFrameTupleAppender.SNAPSHOT_MESSAGE, DEFAULT_FRAME_SIZE + 1);
+ TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
+ writeRandomMessage(message, MessagingFrameTupleAppender.MARKER_MESSAGE, DEFAULT_FRAME_SIZE + 1);
ISerializerDeserializer<?>[] serdes = new ISerializerDeserializer<?>[] {
Integer64SerializerDeserializer.INSTANCE, DoubleSerializerDeserializer.INSTANCE,
BooleanSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() };
@@ -165,7 +167,7 @@ public class ConnectorDescriptorWithMessagingTest {
fta.reset(writer.getLastFrame());
Assert.assertEquals(fta.getTupleCount(), 1);
FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
- Assert.assertEquals(MessagingFrameTupleAppender.SNAPSHOT_MESSAGE,
+ Assert.assertEquals(MessagingFrameTupleAppender.MARKER_MESSAGE,
MessagingFrameTupleAppender.getMessageType(tempBuffer));
}
message.getBuffer().clear();
@@ -228,9 +230,9 @@ public class ConnectorDescriptorWithMessagingTest {
IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
VSizeFrame message = new VSizeFrame(ctx);
VSizeFrame tempBuffer = new VSizeFrame(ctx);
- ctx.setSharedObject(message);
+ TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
message.getBuffer().clear();
- writeRandomMessage(message, MessagingFrameTupleAppender.SNAPSHOT_MESSAGE, DEFAULT_FRAME_SIZE);
+ writeRandomMessage(message, MessagingFrameTupleAppender.MARKER_MESSAGE, DEFAULT_FRAME_SIZE);
ISerializerDeserializer<?>[] serdes = new ISerializerDeserializer<?>[] {
Integer64SerializerDeserializer.INSTANCE, DoubleSerializerDeserializer.INSTANCE,
BooleanSerializerDeserializer.INSTANCE, new UTF8StringSerializerDeserializer() };
@@ -264,7 +266,7 @@ public class ConnectorDescriptorWithMessagingTest {
fta.reset(writer.getLastFrame());
Assert.assertEquals(fta.getTupleCount(), 1);
FeedUtils.processFeedMessage(writer.getLastFrame(), tempBuffer, fta);
- Assert.assertEquals(MessagingFrameTupleAppender.SNAPSHOT_MESSAGE,
+ Assert.assertEquals(MessagingFrameTupleAppender.MARKER_MESSAGE,
MessagingFrameTupleAppender.getMessageType(tempBuffer));
}
partitioner.close();
@@ -286,7 +288,7 @@ public class ConnectorDescriptorWithMessagingTest {
IHyracksTaskContext ctx = TestUtils.create(DEFAULT_FRAME_SIZE);
VSizeFrame message = new VSizeFrame(ctx);
VSizeFrame tempBuffer = new VSizeFrame(ctx);
- ctx.setSharedObject(message);
+ TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, message, ctx);
message.getBuffer().clear();
message.getBuffer().put(MessagingFrameTupleAppender.ACK_REQ_FEED_MESSAGE);
message.getBuffer().flip();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
new file mode 100644
index 0000000..a0ef31e
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.dataflow;
+
+import java.io.File;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.asterix.app.bootstrap.TestNodeController;
+import org.apache.asterix.app.data.gen.TestTupleCounterFrameWriter;
+import org.apache.asterix.app.data.gen.TupleGenerator;
+import org.apache.asterix.app.data.gen.TupleGenerator.GenerationFunction;
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.common.config.GlobalConfig;
+import org.apache.asterix.common.dataflow.AsterixLSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.transactions.DatasetId;
+import org.apache.asterix.common.transactions.ILogRecord;
+import org.apache.asterix.common.transactions.ITransactionContext;
+import org.apache.asterix.external.util.DataflowUtils;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails.PartitioningStrategy;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.transaction.management.service.logging.LogReader;
+import org.apache.commons.io.FileUtils;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.api.comm.VSizeFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.test.CountAnswer;
+import org.apache.hyracks.api.test.FrameWriterTestUtils;
+import org.apache.hyracks.api.test.FrameWriterTestUtils.FrameWriterOperation;
+import org.apache.hyracks.api.util.HyracksConstants;
+import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
+import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
+import org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeDataflowHelper;
+import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTree;
+import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class LogMarkerTest {
+
+ private static final String TEST_CONFIG_FILE_NAME = "asterix-build-configuration.xml";
+ private static final IAType[] KEY_TYPES = { BuiltinType.AINT32 };
+ private static final ARecordType RECORD_TYPE = new ARecordType("TestRecordType", new String[] { "key", "value" },
+ new IAType[] { BuiltinType.AINT32, BuiltinType.AINT64 }, false);
+ private static final GenerationFunction[] RECORD_GEN_FUNCTION =
+ { GenerationFunction.DETERMINISTIC, GenerationFunction.DETERMINISTIC };
+ private static final boolean[] UNIQUE_RECORD_FIELDS = { true, false };
+ private static final ARecordType META_TYPE = null;
+ private static final GenerationFunction[] META_GEN_FUNCTION = null;
+ private static final boolean[] UNIQUE_META_FIELDS = null;
+ private static final int[] KEY_INDEXES = { 0 };
+ private static final int[] KEY_INDICATORS = { 0 };
+ private static final int NUM_OF_RECORDS = 100000;
+ private static final int SNAPSHOT_SIZE = 1000;
+ private static final int DATASET_ID = 101;
+ private static final String SPILL_AREA = "target" + File.separator + "spill_area";
+ private static final String DATAVERSE_NAME = "TestDV";
+ private static final String DATASET_NAME = "TestDS";
+ private static final String DATA_TYPE_NAME = "DUMMY";
+ private static final String NODE_GROUP_NAME = "DEFAULT";
+
+ @Before
+ public void setUp() throws Exception {
+ System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, TEST_CONFIG_FILE_NAME);
+ System.out.println("SetUp: ");
+ File f = new File(System.getProperty("user.dir") + File.separator + "target" + File.separator + "txnLogDir");
+ FileUtils.deleteQuietly(f);
+ System.out.println("Dir " + f.getName() + " deleted");
+ f = new File(System.getProperty("user.dir") + File.separator + "target" + File.separator + "IODevice");
+ FileUtils.deleteQuietly(f);
+ System.out.println("Dir " + f.getName() + " deleted");
+ f = new File(System.getProperty("user.dir") + File.separator + SPILL_AREA);
+ FileUtils.deleteQuietly(f);
+ System.out.println("Dir " + f.getName() + " deleted");
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ System.out.println("TearDown");
+ File f = new File(System.getProperty("user.dir") + File.separator + "target" + File.separator + "txnLogDir");
+ FileUtils.deleteQuietly(f);
+ System.out.println("Dir " + f.getName() + " deleted");
+ f = new File(System.getProperty("user.dir") + File.separator + "target" + File.separator + "IODevice");
+ FileUtils.deleteQuietly(f);
+ System.out.println("Dir " + f.getName() + " deleted");
+ f = new File(System.getProperty("user.dir") + File.separator + SPILL_AREA);
+ FileUtils.deleteQuietly(f);
+ System.out.println("Dir " + f.getName() + " deleted");
+ }
+
+ @Test
+ public void testInsertWithSnapshot() {
+ try {
+ TestNodeController nc = new TestNodeController();
+ nc.init();
+ Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME, DATA_TYPE_NAME,
+ NODE_GROUP_NAME, null, null, new InternalDatasetDetails(null, PartitioningStrategy.HASH,
+ Collections.emptyList(), null, null, null, false, null, false),
+ null, DatasetType.INTERNAL, DATASET_ID, 0);
+ try {
+ nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null,
+ null);
+ IHyracksTaskContext ctx = nc.createTestContext();
+ nc.newJobId();
+ ITransactionContext txnCtx = nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
+ AsterixLSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
+ RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null, null);
+ insertOp.open();
+ TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
+ RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS);
+ VSizeFrame frame = new VSizeFrame(ctx);
+ VSizeFrame marker = new VSizeFrame(ctx);
+ FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
+ long markerId = 0L;
+ for (int j = 0; j < NUM_OF_RECORDS; j++) {
+ if (j % SNAPSHOT_SIZE == 0) {
+ marker.reset();
+ marker.getBuffer().put(MessagingFrameTupleAppender.MARKER_MESSAGE);
+ marker.getBuffer().putLong(markerId);
+ marker.getBuffer().flip();
+ markerId++;
+ TaskUtils.putInSharedMap(HyracksConstants.KEY_MESSAGE, marker, ctx);
+ tupleAppender.flush(insertOp);
+ }
+ ITupleReference tuple = tupleGenerator.next();
+ DataflowUtils.addTupleToFrame(tupleAppender, tuple, insertOp);
+ }
+ if (tupleAppender.getTupleCount() > 0) {
+ tupleAppender.write(insertOp, true);
+ }
+ insertOp.close();
+ nc.getTransactionManager().completedTransaction(txnCtx, new DatasetId(-1), -1, true);
+ LSMBTreeDataflowHelper dataflowHelper = nc.getPrimaryIndexDataflowHelper(dataset, KEY_TYPES,
+ RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(), null, null);
+ dataflowHelper.open();
+ LSMBTree btree = (LSMBTree) dataflowHelper.getIndexInstance();
+ long lsn = btree.getMostRecentMarkerLSN();
+ int numOfMarkers = 0;
+ LogReader logReader = (LogReader) nc.getTransactionSubsystem().getLogManager().getLogReader(false);
+ long expectedMarkerId = markerId - 1;
+ while (lsn >= 0) {
+ numOfMarkers++;
+ ILogRecord logRecord = logReader.read(lsn);
+ lsn = logRecord.getPreviousMarkerLSN();
+ long logMarkerId = logRecord.getMarker().getLong();
+ Assert.assertEquals(expectedMarkerId, logMarkerId);
+ expectedMarkerId--;
+ }
+ logReader.close();
+ dataflowHelper.close();
+ Assert.assertEquals(markerId, numOfMarkers);
+ nc.newJobId();
+ TestTupleCounterFrameWriter countOp = create(nc.getSearchOutputDesc(KEY_TYPES, RECORD_TYPE, META_TYPE),
+ Collections.emptyList(), Collections.emptyList(), false);
+ IPushRuntime emptyTupleOp = nc.getFullScanPipeline(countOp, ctx, dataset, KEY_TYPES, RECORD_TYPE,
+ META_TYPE, new NoMergePolicyFactory(), null, null);
+ emptyTupleOp.open();
+ emptyTupleOp.close();
+ Assert.assertEquals(NUM_OF_RECORDS, countOp.getCount());
+ } finally {
+ nc.deInit();
+ }
+ } catch (Throwable e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+
+ }
+
+ public static TestTupleCounterFrameWriter create(RecordDescriptor recordDescriptor,
+ Collection<FrameWriterOperation> exceptionThrowingOperations,
+ Collection<FrameWriterOperation> errorThrowingOperations, boolean deepCopyInputFrames) {
+ CountAnswer openAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Open,
+ exceptionThrowingOperations, errorThrowingOperations);
+ CountAnswer nextAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.NextFrame,
+ exceptionThrowingOperations, errorThrowingOperations);
+ CountAnswer flushAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Flush,
+ exceptionThrowingOperations, errorThrowingOperations);
+ CountAnswer failAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Fail,
+ exceptionThrowingOperations, errorThrowingOperations);
+ CountAnswer closeAnswer = FrameWriterTestUtils.createAnswer(FrameWriterOperation.Close,
+ exceptionThrowingOperations, errorThrowingOperations);
+ return new TestTupleCounterFrameWriter(recordDescriptor, openAnswer, nextAnswer, flushAnswer, failAnswer,
+ closeAnswer, deepCopyInputFrames);
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestRecordDescriptorFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestRecordDescriptorFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestRecordDescriptorFactory.java
deleted file mode 100644
index 536bf3a..0000000
--- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestRecordDescriptorFactory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.test.dataflow;
-
-import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
-import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
-
-public class TestRecordDescriptorFactory {
- public RecordDescriptor createRecordDescriptor(ISerializerDeserializer<?>... serdes) {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-common/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/pom.xml b/asterixdb/asterix-common/pom.xml
index fc1c221..f6c0c99 100644
--- a/asterixdb/asterix-common/pom.xml
+++ b/asterixdb/asterix-common/pom.xml
@@ -252,6 +252,11 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.17</version>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index b3eb281..71c30d5 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -79,7 +79,7 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker {
@Override
public synchronized void completeOperation(ILSMIndex index, LSMOperationType opType,
ISearchOperationCallback searchCallback, IModificationOperationCallback modificationCallback)
- throws HyracksDataException {
+ throws HyracksDataException {
if (opType == LSMOperationType.MODIFICATION || opType == LSMOperationType.FORCE_MODIFICATION) {
decrementNumActiveOperations(modificationCallback);
if (numActiveOperations.get() == 0) {
@@ -148,12 +148,12 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker {
for (ILSMIndex lsmIndex : dsInfo.getDatasetIndexes()) {
//get resource
- ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE,
- NoOpOperationCallback.INSTANCE);
+ ILSMIndexAccessor accessor =
+ lsmIndex.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE);
//update resource lsn
- AbstractLSMIOOperationCallback ioOpCallback = (AbstractLSMIOOperationCallback) lsmIndex
- .getIOOperationCallback();
+ AbstractLSMIOOperationCallback ioOpCallback =
+ (AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback();
ioOpCallback.updateLastLSN(logRecord.getLSN());
//schedule flush after update
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
index 9a76b40..cf66d30 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/AsterixLSMInsertDeleteOperatorNodePushable.java
@@ -22,6 +22,8 @@ import java.nio.ByteBuffer;
import org.apache.asterix.common.api.IAsterixAppRuntimeContext;
import org.apache.asterix.common.exceptions.FrameDataException;
+import org.apache.asterix.common.transactions.ILogMarkerCallback;
+import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
@@ -31,6 +33,7 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
import org.apache.hyracks.storage.am.common.dataflow.IIndexOperatorDescriptor;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
@@ -41,7 +44,9 @@ import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
public class AsterixLSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUpdateDeleteOperatorNodePushable {
+ public static final String KEY_INDEX = "Index";
private final boolean isPrimary;
+ // This class has both lsmIndex and index (in super class) pointing to the same object
private AbstractLSMIndex lsmIndex;
private int i = 0;
@@ -59,10 +64,6 @@ public class AsterixLSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUp
private int currentTupleIdx;
private int lastFlushedTupleIdx;
- public boolean isPrimary() {
- return isPrimary;
- }
-
public AsterixLSMInsertDeleteOperatorNodePushable(IIndexOperatorDescriptor opDesc, IHyracksTaskContext ctx,
int partition, int[] fieldPermutation, IRecordDescriptorProvider recordDescProvider, IndexOperation op,
boolean isPrimary) {
@@ -79,6 +80,10 @@ public class AsterixLSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUp
indexHelper.open();
lsmIndex = (AbstractLSMIndex) indexHelper.getIndexInstance();
try {
+ if (isPrimary && ctx.getSharedObject() != null) {
+ PrimaryIndexLogMarkerCallback callback = new PrimaryIndexLogMarkerCallback(lsmIndex);
+ TaskUtils.putInSharedMap(ILogMarkerCallback.KEY_MARKER_CALLBACK, callback, ctx);
+ }
writer.open();
modCallback = opDesc.getModificationOpCallbackFactory().createModificationOperationCallback(
indexHelper.getResourcePath(), indexHelper.getResourceID(), indexHelper.getResourcePartition(),
@@ -185,4 +190,8 @@ public class AsterixLSMInsertDeleteOperatorNodePushable extends LSMIndexInsertUp
writer.fail();
}
}
+
+ public boolean isPrimary() {
+ return isPrimary;
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogMarkerCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogMarkerCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogMarkerCallback.java
new file mode 100644
index 0000000..11d649b
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogMarkerCallback.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.transactions;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This interface provide callback mechanism for adding marker logs to the transaction log file
+ */
+public interface ILogMarkerCallback {
+
+ String KEY_MARKER_CALLBACK = "MARKER_CALLBACK";
+
+ /**
+ * Called before writing the marker log allowing addition of specific information to the log record
+ *
+ * @param buffer:
+ * the log buffer to write to
+ */
+ void before(ByteBuffer buffer);
+
+ /**
+ * Called after the log's been appended to the log tail passing the position of the log used for random access
+ *
+ * @param lsn
+ */
+ void after(long lsn);
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
index cd05ba0..29af931 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ILogRecord.java
@@ -32,11 +32,38 @@ public interface ILogRecord {
LARGE_RECORD
}
- public static final int JOB_TERMINATE_LOG_SIZE = 14; //JOB_COMMIT or ABORT log type
- public static final int ENTITY_COMMIT_LOG_BASE_SIZE = 30;
- public static final int UPDATE_LOG_BASE_SIZE = 51;
- public static final int FLUSH_LOG_SIZE = 18;
- public static final int WAIT_LOG_SIZE = 14;
+ public static final int CHKSUM_LEN = Long.BYTES;
+ public static final int FLDCNT_LEN = Integer.BYTES;
+ public static final int DS_LEN = Integer.BYTES;
+ public static final int LOG_SOURCE_LEN = Byte.BYTES;
+ public static final int LOGRCD_SZ_LEN = Integer.BYTES;
+ public static final int NEWOP_LEN = Byte.BYTES;
+ public static final int NEWVALSZ_LEN = Integer.BYTES;
+ public static final int PKHASH_LEN = Integer.BYTES;
+ public static final int PKSZ_LEN = Integer.BYTES;
+ public static final int PRVLSN_LEN = Long.BYTES;
+ public static final int RS_PARTITION_LEN = Integer.BYTES;
+ public static final int RSID_LEN = Long.BYTES;
+ public static final int SEQ_NUM_LEN = Long.BYTES;
+ public static final int TYPE_LEN = Byte.BYTES;
+ public static final int UUID_LEN = Long.BYTES;
+ public static final int VBUCKET_ID_LEN = Short.BYTES;
+
+ public static final int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + JobId.BYTES;
+ public static final int ENTITYCOMMIT_UPDATE_HEADER_LEN = RS_PARTITION_LEN + DatasetId.BYTES + PKHASH_LEN + PKSZ_LEN;
+ public static final int UPDATE_LSN_HEADER = RSID_LEN + LOGRCD_SZ_LEN;
+ public static final int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + NEWVALSZ_LEN;
+ // What are these fields? vvvvv
+ public static final int REMOTE_FLUSH_LOG_EXTRA_FIELDS_LEN = Long.BYTES + Integer.BYTES + Integer.BYTES;
+
+ // How are the following computed?
+ public static final int JOB_TERMINATE_LOG_SIZE = ALL_RECORD_HEADER_LEN + CHKSUM_LEN;
+ public static final int ENTITY_COMMIT_LOG_BASE_SIZE = 30; // ALL_RECORD_HEADER_LEN + CHKSUM_LEN +?
+ public static final int UPDATE_LOG_BASE_SIZE = 51; // ALL_RECORD_HEADER_LEN + CHKSUM_LEN +?
+ public static final int FLUSH_LOG_SIZE = 18; // ALL_RECORD_HEADER_LEN + CHKSUM_LEN +?
+ public static final int WAIT_LOG_SIZE = ALL_RECORD_HEADER_LEN + CHKSUM_LEN;
+ public static final int MARKER_BASE_LOG_SIZE =
+ ALL_RECORD_HEADER_LEN + CHKSUM_LEN + DS_LEN + RS_PARTITION_LEN + PRVLSN_LEN + LOGRCD_SZ_LEN;
public RecordReadStatus readLogRecord(ByteBuffer buffer);
@@ -135,7 +162,15 @@ public interface ILogRecord {
public ITupleReference getOldValue();
- public void setOldValue(ITupleReference oldValue);
+ public void setOldValue(ITupleReference tupleBefore);
- public void setOldValueSize(int oldValueSize);
+ public void setOldValueSize(int beforeSize);
+
+ public boolean isMarker();
+
+ public ByteBuffer getMarker();
+
+ public void logAppended(long lsn);
+
+ public long getPreviousMarkerLSN();
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index 23fdd0f..306b888 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -71,24 +71,6 @@ import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter;
public class LogRecord implements ILogRecord {
- private static final int LOG_SOURCE_LEN = Byte.BYTES;
- private static final int TYPE_LEN = Byte.BYTES;
- public static final int PKHASH_LEN = Integer.BYTES;
- public static final int PKSZ_LEN = Integer.BYTES;
- private static final int RS_PARTITION_LEN = Integer.BYTES;
- private static final int RSID_LEN = Long.BYTES;
- private static final int LOGRCD_SZ_LEN = Integer.BYTES;
- private static final int FLDCNT_LEN = Integer.BYTES;
- private static final int NEWOP_LEN = Byte.BYTES;
- private static final int NEWVALSZ_LEN = Integer.BYTES;
- private static final int CHKSUM_LEN = Long.BYTES;
-
- private static final int ALL_RECORD_HEADER_LEN = LOG_SOURCE_LEN + TYPE_LEN + JobId.BYTES;
- private static final int ENTITYCOMMIT_UPDATE_HEADER_LEN = RS_PARTITION_LEN + DatasetId.BYTES + PKHASH_LEN
- + PKSZ_LEN;
- private static final int UPDATE_LSN_HEADER = RSID_LEN + LOGRCD_SZ_LEN;
- private static final int UPDATE_BODY_HEADER = FLDCNT_LEN + NEWOP_LEN + NEWVALSZ_LEN;
-
// ------------- fields in a log record (begin) ------------//
private byte logSource;
private byte logType;
@@ -108,8 +90,10 @@ public class LogRecord implements ILogRecord {
private ITupleReference oldValue;
private int oldValueFieldCount;
private long checksum;
+ private long prevMarkerLSN;
+ private ByteBuffer marker;
// ------------- fields in a log record (end) --------------//
-
+ private final ILogMarkerCallback callback; // A callback for log mark operations
private int PKFieldCnt;
private ITransactionContext txnCtx;
private long LSN;
@@ -129,7 +113,8 @@ public class LogRecord implements ILogRecord {
private String nodeId;
private boolean replicated = false;
- public LogRecord() {
+ public LogRecord(ILogMarkerCallback callback) {
+ this.callback = callback;
isFlushed = new AtomicBoolean(false);
readPKValue = new PrimaryKeyTupleReference();
readNewValue = SimpleTupleWriter.INSTANCE.createTupleReference();
@@ -138,49 +123,70 @@ public class LogRecord implements ILogRecord {
logSource = LogSource.LOCAL;
}
- private void writeLogRecordCommonFields(ByteBuffer buffer) {
+ public LogRecord() {
+ this(null);
+ }
+
+ private void doWriteLogRecord(ByteBuffer buffer) {
buffer.put(logSource);
buffer.put(logType);
buffer.putInt(jobId);
- if (logType == LogType.UPDATE || logType == LogType.ENTITY_COMMIT || logType == LogType.UPSERT_ENTITY_COMMIT) {
- buffer.putInt(resourcePartition);
- buffer.putInt(datasetId);
- buffer.putInt(PKHashValue);
- if (PKValueSize <= 0) {
- throw new IllegalStateException("Primary Key Size is less than or equal to 0");
- }
- buffer.putInt(PKValueSize);
- writePKValue(buffer);
- }
- if (logType == LogType.UPDATE) {
- buffer.putLong(resourceId);
- buffer.putInt(logSize);
- buffer.putInt(newValueFieldCount);
- buffer.put(newOp);
- buffer.putInt(newValueSize);
- writeTuple(buffer, newValue, newValueSize);
- if (oldValueSize > 0) {
- buffer.putInt(oldValueSize);
- buffer.putInt(oldValueFieldCount);
- writeTuple(buffer, oldValue, oldValueSize);
- }
+ switch (logType) {
+ case LogType.ENTITY_COMMIT:
+ case LogType.UPSERT_ENTITY_COMMIT:
+ writeEntityInfo(buffer);
+ break;
+ case LogType.UPDATE:
+ writeEntityInfo(buffer);
+ buffer.putLong(resourceId);
+ buffer.putInt(logSize);
+ buffer.putInt(newValueFieldCount);
+ buffer.put(newOp);
+ buffer.putInt(newValueSize);
+ writeTuple(buffer, newValue, newValueSize);
+ if (oldValueSize > 0) {
+ buffer.putInt(oldValueSize);
+ buffer.putInt(oldValueFieldCount);
+ writeTuple(buffer, oldValue, oldValueSize);
+ }
+ break;
+ case LogType.FLUSH:
+ buffer.putInt(datasetId);
+ break;
+ case LogType.MARKER:
+ buffer.putInt(datasetId);
+ buffer.putInt(resourcePartition);
+ callback.before(buffer);
+ buffer.putInt(logSize);
+ buffer.put(marker);
+ break;
+ default:
+ // Do nothing
}
- if (logType == LogType.FLUSH) {
- buffer.putInt(datasetId);
+ }
+
+ private void writeEntityInfo(ByteBuffer buffer) {
+ buffer.putInt(resourcePartition);
+ buffer.putInt(datasetId);
+ buffer.putInt(PKHashValue);
+ if (PKValueSize <= 0) {
+ throw new IllegalStateException("Primary Key Size is less than or equal to 0");
}
+ buffer.putInt(PKValueSize);
+ writePKValue(buffer);
}
@Override
public void writeLogRecord(ByteBuffer buffer) {
int beginOffset = buffer.position();
- writeLogRecordCommonFields(buffer);
+ doWriteLogRecord(buffer);
checksum = generateChecksum(buffer, beginOffset, logSize - CHKSUM_LEN);
buffer.putLong(checksum);
}
@Override
public void writeRemoteLogRecord(ByteBuffer buffer) {
- writeLogRecordCommonFields(buffer);
+ doWriteLogRecord(buffer);
if (logType == LogType.FLUSH) {
buffer.putLong(LSN);
buffer.putInt(numOfFlushedIndexes);
@@ -222,7 +228,7 @@ public class LogRecord implements ILogRecord {
int beginOffset = buffer.position();
//read common fields
- RecordReadStatus status = readLogCommonFields(buffer);
+ RecordReadStatus status = doReadLogRecord(buffer);
if (status != RecordReadStatus.OK) {
buffer.position(beginOffset);
return status;
@@ -241,7 +247,7 @@ public class LogRecord implements ILogRecord {
return RecordReadStatus.OK;
}
- private RecordReadStatus readLogCommonFields(ByteBuffer buffer) {
+ private RecordReadStatus doReadLogRecord(ByteBuffer buffer) {
//first we need the logtype and Job ID, if the buffer isn't that big, then no dice.
if (buffer.remaining() < ALL_RECORD_HEADER_LEN) {
return RecordReadStatus.TRUNCATED;
@@ -255,64 +261,88 @@ public class LogRecord implements ILogRecord {
return RecordReadStatus.TRUNCATED;
}
datasetId = buffer.getInt();
- resourceId = 0L;
+ resourceId = 0l;
+ // fall throuh
+ case LogType.WAIT:
+ computeAndSetLogSize();
break;
- case LogType.ABORT:
case LogType.JOB_COMMIT:
+ case LogType.ABORT:
datasetId = -1;
PKHashValue = -1;
+ computeAndSetLogSize();
break;
case LogType.ENTITY_COMMIT:
case LogType.UPSERT_ENTITY_COMMIT:
- if (!readEntityInfo(buffer)) {
+ if (readEntityInfo(buffer)) {
+ computeAndSetLogSize();
+ } else {
return RecordReadStatus.TRUNCATED;
}
break;
case LogType.UPDATE:
- if (!readEntityInfo(buffer)) {
+ if (readEntityInfo(buffer)) {
+ if (buffer.remaining() < UPDATE_LSN_HEADER + UPDATE_BODY_HEADER) {
+ return RecordReadStatus.TRUNCATED;
+ }
+ resourceId = buffer.getLong();
+ logSize = buffer.getInt();
+ newValueFieldCount = buffer.getInt();
+ newOp = buffer.get();
+ newValueSize = buffer.getInt();
+ if (buffer.remaining() < newValueSize) {
+ if (logSize > buffer.capacity()) {
+ return RecordReadStatus.LARGE_RECORD;
+ }
+ return RecordReadStatus.TRUNCATED;
+ }
+ newValue = readTuple(buffer, readNewValue, newValueFieldCount, newValueSize);
+ if (logSize > getUpdateLogSizeWithoutOldValue()) {
+ // Prev Image exists
+ if (buffer.remaining() < Integer.BYTES) {
+ return RecordReadStatus.TRUNCATED;
+ }
+ oldValueSize = buffer.getInt();
+ if (buffer.remaining() < Integer.BYTES) {
+ return RecordReadStatus.TRUNCATED;
+ }
+ oldValueFieldCount = buffer.getInt();
+ if (buffer.remaining() < oldValueSize) {
+ return RecordReadStatus.TRUNCATED;
+ }
+ oldValue = readTuple(buffer, readOldValue, oldValueFieldCount, oldValueSize);
+ } else {
+ oldValueSize = 0;
+ }
+ } else {
return RecordReadStatus.TRUNCATED;
}
- if (buffer.remaining() < UPDATE_LSN_HEADER + UPDATE_BODY_HEADER) {
+ break;
+ case LogType.MARKER:
+ if (buffer.remaining() < DS_LEN + RS_PARTITION_LEN + PRVLSN_LEN + LOGRCD_SZ_LEN) {
return RecordReadStatus.TRUNCATED;
}
- resourceId = buffer.getLong();
+ datasetId = buffer.getInt();
+ resourcePartition = buffer.getInt();
+ prevMarkerLSN = buffer.getLong();
logSize = buffer.getInt();
- newValueFieldCount = buffer.getInt();
- newOp = buffer.get();
- newValueSize = buffer.getInt();
- return readEntity(buffer);
+ int lenRemaining = logSize - MARKER_BASE_LOG_SIZE;
+ if (buffer.remaining() < lenRemaining) {
+ return RecordReadStatus.TRUNCATED;
+ }
+
+ if (marker == null || marker.capacity() < lenRemaining) {
+ // TODO(amoudi): account for memory allocation
+ marker = ByteBuffer.allocate(lenRemaining + Short.BYTES);
+ }
+ marker.clear();
+ buffer.get(marker.array(), 0, lenRemaining);
+ marker.position(lenRemaining);
+ marker.flip();
+ break;
default:
break;
}
- computeAndSetLogSize();
- return RecordReadStatus.OK;
- }
-
- private RecordReadStatus readEntity(ByteBuffer buffer) {
- if (buffer.remaining() < newValueSize) {
- if (logSize > buffer.capacity()) {
- return RecordReadStatus.LARGE_RECORD;
- }
- return RecordReadStatus.TRUNCATED;
- }
- newValue = readTuple(buffer, readNewValue, newValueFieldCount, newValueSize);
- if (logSize > getUpdateLogSizeWithoutOldValue()) {
- // Prev Image exists
- if (buffer.remaining() < Integer.BYTES) {
- return RecordReadStatus.TRUNCATED;
- }
- oldValueSize = buffer.getInt();
- if (buffer.remaining() < Integer.BYTES) {
- return RecordReadStatus.TRUNCATED;
- }
- oldValueFieldCount = buffer.getInt();
- if (buffer.remaining() < oldValueSize) {
- return RecordReadStatus.TRUNCATED;
- }
- oldValue = readTuple(buffer, readOldValue, oldValueFieldCount, oldValueSize);
- } else {
- oldValueSize = 0;
- }
return RecordReadStatus.OK;
}
@@ -339,7 +369,7 @@ public class LogRecord implements ILogRecord {
@Override
public void readRemoteLog(ByteBuffer buffer) {
//read common fields
- readLogCommonFields(buffer);
+ doReadLogRecord(buffer);
if (logType == LogType.FLUSH) {
LSN = buffer.getLong();
@@ -412,11 +442,18 @@ public class LogRecord implements ILogRecord {
case LogType.WAIT:
logSize = WAIT_LOG_SIZE;
break;
+ case LogType.MARKER:
+ setMarkerLogSize();
+ break;
default:
throw new IllegalStateException("Unsupported Log Type");
}
}
+ private void setMarkerLogSize() {
+ logSize = MARKER_BASE_LOG_SIZE + marker.remaining();
+ }
+
@Override
public String getLogRecordForDisplay() {
StringBuilder builder = new StringBuilder();
@@ -688,4 +725,28 @@ public class LogRecord implements ILogRecord {
public void setOldValueSize(int oldValueSize) {
this.oldValueSize = oldValueSize;
}
+
+ public void setMarker(ByteBuffer marker) {
+ this.marker = marker;
+ }
+
+ @Override
+ public boolean isMarker() {
+ return logType == LogType.MARKER;
+ }
+
+ @Override
+ public void logAppended(long lsn) {
+ callback.after(lsn);
+ }
+
+ @Override
+ public long getPreviousMarkerLSN() {
+ return prevMarkerLSN;
+ }
+
+ @Override
+ public ByteBuffer getMarker() {
+ return marker;
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
index 714b8f7..269e4b9 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
@@ -27,6 +27,7 @@ public class LogType {
public static final byte FLUSH = 4;
public static final byte UPSERT_ENTITY_COMMIT = 5;
public static final byte WAIT = 6;
+ public static final byte MARKER = 7;
private static final String STRING_UPDATE = "UPDATE";
private static final String STRING_JOB_COMMIT = "JOB_COMMIT";
@@ -35,8 +36,8 @@ public class LogType {
private static final String STRING_FLUSH = "FLUSH";
private static final String STRING_UPSERT_ENTITY_COMMIT = "UPSERT_ENTITY_COMMIT";
private static final String STRING_WAIT = "WAIT";
-
- private static final String STRING_INVALID_LOG_TYPE = "INVALID_LOG_TYPE";
+ private static final String STRING_MARKER = "MARKER";
+ private static final String STRING_UNKNOWN_LOG_TYPE = "UNKNOWN_LOG_TYPE";
public static String toString(byte logType) {
switch (logType) {
@@ -54,8 +55,10 @@ public class LogType {
return STRING_UPSERT_ENTITY_COMMIT;
case LogType.WAIT:
return STRING_WAIT;
+ case LogType.MARKER:
+ return STRING_MARKER;
default:
- return STRING_INVALID_LOG_TYPE;
+ return STRING_UNKNOWN_LOG_TYPE;
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java
new file mode 100644
index 0000000..7dae65f
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/PrimaryIndexLogMarkerCallback.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.transactions;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+
+/**
+ * A basic callback used to write marker to transaction logs
+ */
+public class PrimaryIndexLogMarkerCallback implements ILogMarkerCallback {
+
+ private AbstractLSMIndex index;
+
+ /**
+ * @param index:
+ * a pointer to the primary index used to store marker log info
+ * @throws HyracksDataException
+ */
+ public PrimaryIndexLogMarkerCallback(AbstractLSMIndex index) throws HyracksDataException {
+ this.index = index;
+ }
+
+ @Override
+ public void before(ByteBuffer buffer) {
+ buffer.putLong(index.getCurrentMemoryComponent().getMostRecentMarkerLSN());
+ }
+
+ @Override
+ public void after(long lsn) {
+ index.getCurrentMemoryComponent().setMostRecentMarkerLSN(lsn);
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/AsterixConstants.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/AsterixConstants.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/AsterixConstants.java
new file mode 100644
index 0000000..4bca216
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/AsterixConstants.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.utils;
+
+/**
+ * A static class that stores asterix constants
+ */
+public class AsterixConstants {
+ public static final String ASTERIX_ROOT_METADATA_DIR = "asterix_root_metadata";
+
+ private AsterixConstants() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/FrameStack.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/FrameStack.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/FrameStack.java
new file mode 100644
index 0000000..8c83687
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/FrameStack.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.utils;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.ArrayDeque;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * Not thread safe stack that is used to store fixed size buffers in memory
+ * Once memory is consumed, it uses disk to store buffers
+ */
+public class FrameStack implements Closeable {
+ private static final AtomicInteger stackIdGenerator = new AtomicInteger(0);
+ private static final String STACK_FILE_NAME = "stack";
+ private final int stackId;
+ private final int frameSize;
+ private final int numOfMemoryFrames;
+ private final ArrayDeque<ByteBuffer> fullBuffers;
+ private final ArrayDeque<ByteBuffer> emptyBuffers;
+ private int totalWriteCount = 0;
+ private int totalReadCount = 0;
+ private final File file;
+ private final RandomAccessFile iostream;
+ private final byte[] frame;
+
+ /**
+ * Create a hybrid of memory and disk stack of byte buffers
+ *
+ * @param dir
+ * @param frameSize
+ * @param numOfMemoryFrames
+ * @throws HyracksDataException
+ * @throws FileNotFoundException
+ */
+ public FrameStack(String dir, int frameSize, int numOfMemoryFrames)
+ throws HyracksDataException, FileNotFoundException {
+ this.stackId = stackIdGenerator.getAndIncrement();
+ this.frameSize = frameSize;
+ this.numOfMemoryFrames = numOfMemoryFrames;
+ this.fullBuffers = numOfMemoryFrames <= 0 ? null : new ArrayDeque<>();
+ this.emptyBuffers = numOfMemoryFrames <= 0 ? null : new ArrayDeque<>();
+ this.file = StoragePathUtil.createFile(
+ ((dir == null) ? "" : (dir.endsWith(File.separator) ? dir : (dir + File.separator))) + STACK_FILE_NAME,
+ stackId);
+ this.iostream = new RandomAccessFile(file, "rw");
+ this.frame = new byte[frameSize];
+ }
+
+ /**
+ * @return the number of remaining frames to be read in the stack
+ */
+ public int remaining() {
+ return totalWriteCount - totalReadCount;
+ }
+
+ /**
+ * copy content of buffer into the stack
+ *
+ * @param buffer
+ * @throws IOException
+ */
+ public synchronized void push(ByteBuffer buffer) throws IOException {
+ int diff = totalWriteCount - totalReadCount;
+ if (diff < numOfMemoryFrames) {
+ ByteBuffer aBuffer = allocate();
+ aBuffer.put(buffer.array());
+ aBuffer.flip();
+ fullBuffers.push(aBuffer);
+ } else {
+ long position = (long) (diff - numOfMemoryFrames) * frameSize;
+ if (position != iostream.getFilePointer()) {
+ iostream.seek(position);
+ }
+ iostream.write(buffer.array());
+ }
+ totalWriteCount++;
+ }
+
+ private ByteBuffer allocate() {
+ ByteBuffer aBuffer = emptyBuffers.poll();
+ if (aBuffer == null) {
+ aBuffer = ByteBuffer.allocate(frameSize);
+ }
+ aBuffer.clear();
+ return aBuffer;
+ }
+
+ /**
+ * Free a frame off of the stack and copy it into dest
+ *
+ * @param dest
+ * @throws IOException
+ */
+ public synchronized void pop(ByteBuffer dest) throws IOException {
+ dest.clear();
+ int diff = totalWriteCount - totalReadCount - 1;
+ if (diff >= 0) {
+ if (diff < numOfMemoryFrames) {
+ totalReadCount++;
+ ByteBuffer aBuffer = fullBuffers.pop();
+ emptyBuffers.push(aBuffer);
+ dest.put(aBuffer.array());
+ } else {
+ long position = (long) (diff - numOfMemoryFrames) * frameSize;
+ iostream.seek(position);
+ iostream.readFully(frame);
+ dest.put(frame);
+ }
+ }
+ dest.flip();
+ }
+
+ /**
+ * Closing this stack will result in the data being deleted
+ *
+ * @throws IOException
+ */
+ @Override
+ public void close() throws IOException {
+ iostream.close();
+ Files.delete(file.toPath());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index 78b06fb..615e8af 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -24,12 +24,16 @@ import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
import org.apache.hyracks.dataflow.std.file.FileSplit;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
public class StoragePathUtil {
+ private static final Logger LOGGER = Logger.getLogger(StoragePathUtil.class.getName());
public static final String PARTITION_DIR_PREFIX = "partition_";
public static final String TEMP_DATASETS_STORAGE_FOLDER = "temp";
public static final String DATASET_INDEX_NAME_SEPARATOR = "_idx_";
@@ -70,4 +74,39 @@ public class StoragePathUtil {
public static int getPartitionNumFromName(String name) {
return Integer.parseInt(name.substring(PARTITION_DIR_PREFIX.length()));
}
+
+ /**
+ * Create a file
+ * Note: this method is not thread safe. It is the responsibility of the caller to ensure no path conflict when
+ * creating files simultaneously
+ *
+ * @param name
+ * @param count
+ * @return
+ * @throws HyracksDataException
+ */
+ public static File createFile(String name, int count) throws HyracksDataException {
+ try {
+ String fileName = name + "_" + count;
+ File file = new File(fileName);
+ if (file.getParentFile() != null) {
+ file.getParentFile().mkdirs();
+ }
+ if (!file.exists()) {
+ boolean success = file.createNewFile();
+ if (!success) {
+ throw new HyracksDataException("Unable to create spill file " + fileName);
+ } else {
+ if (LOGGER.isEnabledFor(Level.INFO)) {
+ LOGGER.info("Created spill file " + file.getAbsolutePath());
+ }
+ }
+ } else {
+ throw new HyracksDataException("spill file " + fileName + " already exists");
+ }
+ return file;
+ } catch (Exception e) {
+ throw new HyracksDataException(e);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java
index 1d5b15e..2878d5a 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/TransactionUtil.java
@@ -18,9 +18,12 @@
*/
package org.apache.asterix.common.utils;
+import java.nio.ByteBuffer;
+
import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
import org.apache.asterix.common.transactions.ITransactionContext;
import org.apache.asterix.common.transactions.LogRecord;
+import org.apache.asterix.common.transactions.LogSource;
import org.apache.asterix.common.transactions.LogType;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
@@ -64,4 +67,17 @@ public class TransactionUtil {
logRecord.computeAndSetPKValueSize();
logRecord.computeAndSetLogSize();
}
+
+ public static void formMarkerLogRecord(LogRecord logRecord, ITransactionContext txnCtx, int datasetId,
+ int resourcePartition, ByteBuffer marker) {
+ logRecord.setTxnCtx(txnCtx);
+ logRecord.setLogSource(LogSource.LOCAL);
+ logRecord.setLogType(LogType.MARKER);
+ logRecord.setJobId(txnCtx.getJobId().getId());
+ logRecord.setDatasetId(datasetId);
+ logRecord.setResourcePartition(resourcePartition);
+ marker.get(); // read the first byte since it is not part of the marker object
+ logRecord.setMarker(marker);
+ logRecord.computeAndSetLogSize();
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/AsterixConstants.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/AsterixConstants.java b/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/AsterixConstants.java
deleted file mode 100644
index 11a2510..0000000
--- a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/AsterixConstants.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.event.util;
-
-public class AsterixConstants {
-
- public static String ASTERIX_ROOT_METADATA_DIR = "asterix_root_metadata";
-
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java b/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
index b19a722..1780a51 100644
--- a/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
+++ b/asterixdb/asterix-events/src/main/java/org/apache/asterix/event/util/PatternCreator.java
@@ -27,6 +27,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.asterix.common.utils.AsterixConstants;
import org.apache.asterix.event.driver.EventDriver;
import org.apache.asterix.event.error.VerificationUtil;
import org.apache.asterix.event.model.AsterixInstance;
@@ -72,8 +73,8 @@ public class PatternCreator {
for (Node node : cluster.getNode()) {
if (copyHyracksToNC) {
- Pattern copyHyracksForNC = createCopyHyracksPattern(asterixInstanceName, cluster, node.getClusterIp(),
- destDir);
+ Pattern copyHyracksForNC =
+ createCopyHyracksPattern(asterixInstanceName, cluster, node.getClusterIp(), destDir);
ps.add(copyHyracksForNC);
}
}
@@ -389,8 +390,8 @@ public class PatternCreator {
Nodeid nodeid = new Nodeid(new Value(null, EventDriver.CLIENT_NODE.getId()));
String username = cluster.getUsername() != null ? cluster.getUsername() : System.getProperty("user.name");
String workingDir = cluster.getWorkingDir().getDir();
- String destDir = workingDir + File.separator + "library" + File.separator + dataverse + File.separator
- + libraryName;
+ String destDir =
+ workingDir + File.separator + "library" + File.separator + dataverse + File.separator + libraryName;
String fileToTransfer = new File(libraryPath).getAbsolutePath();
Iterator<Node> installTargets = cluster.getNode().iterator();
@@ -434,8 +435,8 @@ public class PatternCreator {
patternList.add(p);
Iterator<Node> uninstallTargets = cluster.getNode().iterator();
- String libDir = workingDir + File.separator + "library" + File.separator + dataverse + File.separator
- + libraryName;
+ String libDir =
+ workingDir + File.separator + "library" + File.separator + dataverse + File.separator + libraryName;
Node uninstallNode = uninstallTargets.next();
nodeid = new Nodeid(new Value(null, uninstallNode.getId()));
event = new Event("file_delete", nodeid, libDir);
@@ -606,8 +607,8 @@ public class PatternCreator {
String username = cluster.getUsername() == null ? System.getProperty("user.name") : cluster.getUsername();
String srcHost = cluster.getMasterNode().getClientIp();
Nodeid nodeid = new Nodeid(new Value(null, EventDriver.CLIENT_NODE.getId()));
- String srcDir = cluster.getMasterNode().getLogDir() == null ? cluster.getLogDir()
- : cluster.getMasterNode().getLogDir();
+ String srcDir =
+ cluster.getMasterNode().getLogDir() == null ? cluster.getLogDir() : cluster.getMasterNode().getLogDir();
String destDir = outputDir + File.separator + "cc";
String pargs = username + " " + srcHost + " " + srcDir + " " + destDir;
Event event = new Event("directory_copy", nodeid, pargs);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IFeedMarker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IFeedMarker.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IFeedMarker.java
new file mode 100644
index 0000000..487b47d
--- /dev/null
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IFeedMarker.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.api;
+
+import org.apache.hyracks.api.comm.VSizeFrame;
+
+public interface IFeedMarker {
+
+ /**
+ * Mark the frame with a mark denoting the progress of the feed
+ * The mark will be eventually written to the transaction log
+ *
+ * @param mark
+ * a frame to write the progress mark in
+ * @return
+ */
+ public boolean mark(VSizeFrame mark);
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
index 0f5ada4..9d9ff28 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordConverter.java
@@ -20,6 +20,7 @@ package org.apache.asterix.external.api;
import java.io.IOException;
+@FunctionalInterface
public interface IRecordConverter<I, O> {
public O convert(IRawRecord<? extends I> input) throws IOException;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
index 9cce1c9..08ffe18 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IRecordReader.java
@@ -73,4 +73,8 @@ public interface IRecordReader<T> extends Closeable {
* gives the record reader a chance to recover from IO errors during feed intake
*/
public boolean handleException(Throwable th);
+
+ public default IFeedMarker getProgressReporter() {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
index a301ac9..7806489 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
@@ -34,9 +34,9 @@ public class ChangeFeedDataFlowController<T> extends FeedRecordDataFlowControlle
public ChangeFeedDataFlowController(final IHyracksTaskContext ctx, final FeedTupleForwarder tupleForwarder,
final FeedLogManager feedLogManager, final int numOfOutputFields,
- final IRecordWithPKDataParser<T> dataParser, final IRecordReader<T> recordReader)
+ final IRecordWithPKDataParser<T> dataParser, final IRecordReader<T> recordReader, boolean sendMarker)
throws HyracksDataException {
- super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
+ super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader, sendMarker);
this.dataParser = dataParser;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
index b47d278..7d65c52 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
@@ -32,9 +32,9 @@ public class ChangeFeedWithMetaDataFlowController<T> extends FeedWithMetaDataFlo
public ChangeFeedWithMetaDataFlowController(final IHyracksTaskContext ctx, final FeedTupleForwarder tupleForwarder,
final FeedLogManager feedLogManager, final int numOfOutputFields,
- final IRecordWithMetadataParser<T> dataParser, final IRecordReader<T> recordReader)
- throws HyracksDataException {
- super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader);
+ final IRecordWithMetadataParser<T> dataParser, final IRecordReader<T> recordReader, boolean sendMarker)
+ throws HyracksDataException {
+ super(ctx, tupleForwarder, feedLogManager, numOfOutputFields, dataParser, recordReader, sendMarker);
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 87daffa..be9056b 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -19,10 +19,15 @@
package org.apache.asterix.external.dataflow;
import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
+import org.apache.asterix.external.api.IFeedMarker;
import org.apache.asterix.external.api.IRawRecord;
import org.apache.asterix.external.api.IRecordDataParser;
import org.apache.asterix.external.api.IRecordReader;
@@ -30,9 +35,13 @@ import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataExceptionUtils;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.hyracks.api.comm.IFrameWriter;
+import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
import org.apache.log4j.Logger;
public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowController {
@@ -40,38 +49,52 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl
protected final IRecordDataParser<T> dataParser;
protected final IRecordReader<? extends T> recordReader;
protected final AtomicBoolean closed = new AtomicBoolean(false);
- protected final long interval = 1000;
+ protected static final long INTERVAL = 1000;
+ protected final Object mutex = new Object();
+ protected final boolean sendMarker;
protected boolean failed = false;
public FeedRecordDataFlowController(IHyracksTaskContext ctx, FeedTupleForwarder tupleForwarder,
@Nonnull FeedLogManager feedLogManager, int numOfOutputFields, @Nonnull IRecordDataParser<T> dataParser,
- @Nonnull IRecordReader<T> recordReader) throws HyracksDataException {
+ @Nonnull IRecordReader<T> recordReader, boolean sendMarker) throws HyracksDataException {
super(ctx, tupleForwarder, feedLogManager, numOfOutputFields);
this.dataParser = dataParser;
this.recordReader = recordReader;
+ this.sendMarker = sendMarker;
recordReader.setFeedLogManager(feedLogManager);
recordReader.setController(this);
}
@Override
public void start(IFrameWriter writer) throws HyracksDataException {
+ ExecutorService executorService = sendMarker ? Executors.newSingleThreadExecutor() : null;
+ Future<?> result = null;
+ if (sendMarker) {
+ DataflowMarker dataflowMarker = new DataflowMarker(recordReader.getProgressReporter(),
+ TaskUtils.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx));
+ result = executorService.submit(dataflowMarker);
+ }
HyracksDataException hde = null;
try {
failed = false;
tupleForwarder.initialize(ctx, writer);
while (recordReader.hasNext()) {
- IRawRecord<? extends T> record = recordReader.next();
- if (record == null) {
- flush();
- Thread.sleep(interval);
- continue;
+ // synchronized on mutex before we call next() so we don't a marker before its record
+ synchronized (mutex) {
+ IRawRecord<? extends T> record = recordReader.next();
+ if (record == null) {
+ flush();
+ wait(INTERVAL);
+ continue;
+ }
+ tb.reset();
+ parseAndForward(record);
}
- tb.reset();
- parseAndForward(record);
}
} catch (InterruptedException e) {
//TODO: Find out what could cause an interrupted exception beside termination of a job/feed
- LOGGER.warn("Feed has been interrupted. Closing the feed");
+ LOGGER.warn("Feed has been interrupted. Closing the feed", e);
+ Thread.currentThread().interrupt();
} catch (Exception e) {
failed = true;
tupleForwarder.flush();
@@ -90,10 +113,13 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl
hde = ExternalDataExceptionUtils.suppressIntoHyracksDataException(hde, th);
} finally {
closeSignal();
- if (hde != null) {
- throw hde;
+ if (sendMarker && result != null) {
+ result.cancel(true);
}
}
+ if (hde != null) {
+ throw hde;
+ }
}
private void parseAndForward(IRawRecord<? extends T> record) throws IOException {
@@ -170,4 +196,53 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl
// This is not a parser record. most likely, this error happened in the record reader.
return recordReader.handleException(th);
}
+
+ private class DataflowMarker implements Runnable {
+ private final IFeedMarker marker;
+ private final VSizeFrame mark;
+ private volatile boolean stopped = false;
+
+ public DataflowMarker(IFeedMarker marker, VSizeFrame mark) {
+ this.marker = marker;
+ this.mark = mark;
+ }
+
+ public synchronized void stop() {
+ stopped = true;
+ notify();
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ synchronized (this) {
+ if (!stopped) {
+ // TODO (amoudi): find a better reactive way to do this
+ // sleep for two seconds
+ wait(TimeUnit.SECONDS.toMillis(2));
+ } else {
+ break;
+ }
+ }
+ synchronized (mutex) {
+ if (marker.mark(mark)) {
+ // broadcast
+ tupleForwarder.flush();
+ // clear
+ mark.getBuffer().clear();
+ mark.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
+ mark.getBuffer().flip();
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ LOGGER.warn("Marker stopped", e);
+ Thread.currentThread().interrupt();
+ return;
+ } catch (Exception e) {
+ LOGGER.warn("Marker stopped", e);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/7575785a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
index f1eb870..36c6c2f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedTupleForwarder.java
@@ -30,10 +30,12 @@ import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
+import org.apache.hyracks.dataflow.common.util.TaskUtils;
public class FeedTupleForwarder implements ITupleForwarder {
@@ -59,7 +61,7 @@ public class FeedTupleForwarder implements ITupleForwarder {
this.writer = writer;
this.appender = new FrameTupleAppender(frame);
// Set null feed message
- VSizeFrame message = (VSizeFrame) ctx.getSharedObject();
+ VSizeFrame message = TaskUtils.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx);
// a null message
message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
message.getBuffer().flip();