You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mh...@apache.org on 2016/02/18 10:54:28 UTC
[2/6] incubator-asterixdb git commit: Asterix NCs Failback Support
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
index 06a1957..ef2b498 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java
@@ -34,8 +34,8 @@ import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFa
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-public class SecondaryIndexModificationOperationCallbackFactory extends AbstractOperationCallbackFactory implements
- IModificationOperationCallbackFactory {
+public class SecondaryIndexModificationOperationCallbackFactory extends AbstractOperationCallbackFactory
+ implements IModificationOperationCallbackFactory {
private static final long serialVersionUID = 1L;
private final IndexOperation indexOp;
@@ -48,7 +48,7 @@ public class SecondaryIndexModificationOperationCallbackFactory extends Abstract
@Override
public IModificationOperationCallback createModificationOperationCallback(String resourcePath, long resourceId,
- Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
+ int resourcePartition, Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
.getDatasetLifecycleManager();
@@ -60,8 +60,8 @@ public class SecondaryIndexModificationOperationCallbackFactory extends Abstract
try {
ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
IModificationOperationCallback modCallback = new SecondaryIndexModificationOperationCallback(datasetId,
- primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resourceId, resourceType,
- indexOp);
+ primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resourceId,
+ resourcePartition, resourceType, indexOp);
txnCtx.registerIndexAndCallback(resourceId, index, (AbstractOperationCallback) modCallback, false);
return modCallback;
} catch (ACIDException e) {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetIndexModificationOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetIndexModificationOperationCallback.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetIndexModificationOperationCallback.java
index 69aad24..32d3461 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetIndexModificationOperationCallback.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetIndexModificationOperationCallback.java
@@ -39,8 +39,9 @@ public class TempDatasetIndexModificationOperationCallback extends AbstractIndex
public TempDatasetIndexModificationOperationCallback(int datasetId, int[] primaryKeyFields,
ITransactionContext txnCtx, ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId,
- byte resourceType, IndexOperation indexOp) {
- super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, resourceId, resourceType, indexOp);
+ int resourcePartition, byte resourceType, IndexOperation indexOp) {
+ super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, resourceId, resourcePartition,
+ resourceType, indexOp);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
index f2a6820..b08798c 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetPrimaryIndexModificationOperationCallbackFactory.java
@@ -49,7 +49,7 @@ public class TempDatasetPrimaryIndexModificationOperationCallbackFactory extends
@Override
public IModificationOperationCallback createModificationOperationCallback(String resourcePath, long resourceId,
- Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
+ int resourcePartition, Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
.getDatasetLifecycleManager();
@@ -61,8 +61,8 @@ public class TempDatasetPrimaryIndexModificationOperationCallbackFactory extends
try {
ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
IModificationOperationCallback modCallback = new TempDatasetIndexModificationOperationCallback(datasetId,
- primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resourceId, resourceType,
- indexOp);
+ primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resourceId,
+ resourcePartition, resourceType, indexOp);
txnCtx.registerIndexAndCallback(resourceId, index, (AbstractOperationCallback) modCallback, true);
return modCallback;
} catch (ACIDException e) {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
index 8d838a3..403d68d 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/TempDatasetSecondaryIndexModificationOperationCallbackFactory.java
@@ -49,7 +49,7 @@ public class TempDatasetSecondaryIndexModificationOperationCallbackFactory exten
@Override
public IModificationOperationCallback createModificationOperationCallback(String resourcePath, long resourceId,
- Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
+ int resourcePartition, Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
.getDatasetLifecycleManager();
@@ -61,8 +61,8 @@ public class TempDatasetSecondaryIndexModificationOperationCallbackFactory exten
try {
ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
IModificationOperationCallback modCallback = new TempDatasetIndexModificationOperationCallback(datasetId,
- primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resourceId, resourceType,
- indexOp);
+ primaryKeyFields, txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resourceId,
+ resourcePartition, resourceType, indexOp);
txnCtx.registerIndexAndCallback(resourceId, index, (AbstractOperationCallback) modCallback, false);
return modCallback;
} catch (ACIDException e) {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
index dfc622a..f98083a 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallback.java
@@ -31,9 +31,10 @@ public class UpsertOperationCallback extends AbstractIndexModificationOperationC
implements IModificationOperationCallback {
public UpsertOperationCallback(int datasetId, int[] primaryKeyFields, ITransactionContext txnCtx,
- ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId, byte resourceType,
- IndexOperation indexOp) {
- super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, resourceId, resourceType, indexOp);
+ ILockManager lockManager, ITransactionSubsystem txnSubsystem, long resourceId, int resourcePartition,
+ byte resourceType, IndexOperation indexOp) {
+ super(datasetId, primaryKeyFields, txnCtx, lockManager, txnSubsystem, resourceId, resourcePartition,
+ resourceType, indexOp);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
index 0c83ab5..707f986 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java
@@ -47,7 +47,7 @@ public class UpsertOperationCallbackFactory extends AbstractOperationCallbackFac
@Override
public IModificationOperationCallback createModificationOperationCallback(String resourceName, long resourceId,
- Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
+ int resourcePartition, Object resource, IHyracksTaskContext ctx) throws HyracksDataException {
ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx);
IIndexLifecycleManager indexLifeCycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider()
@@ -60,7 +60,8 @@ public class UpsertOperationCallbackFactory extends AbstractOperationCallbackFac
try {
ITransactionContext txnCtx = txnSubsystem.getTransactionManager().getTransactionContext(jobId, false);
IModificationOperationCallback modCallback = new UpsertOperationCallback(datasetId, primaryKeyFields,
- txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resourceId, resourceType, indexOp);
+ txnCtx, txnSubsystem.getLockManager(), txnSubsystem, resourceId, resourcePartition, resourceType,
+ indexOp);
txnCtx.registerIndexAndCallback(resourceId, index, (AbstractOperationCallback) modCallback, true);
return modCallback;
} catch (ACIDException e) {
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 655fd2a..9cb456f 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -28,11 +28,13 @@ import java.io.ObjectOutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.SortedMap;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -40,6 +42,7 @@ import org.apache.asterix.common.cluster.ClusterPartition;
import org.apache.asterix.common.config.AsterixMetadataProperties;
import org.apache.asterix.common.replication.AsterixReplicationJob;
import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IODeviceHandle;
@@ -67,6 +70,9 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
private boolean isReplicationEnabled = false;
private Set<String> filesToBeReplicated;
private final SortedMap<Integer, ClusterPartition> clusterPartitions;
+ private final Set<Integer> nodeOriginalPartitions;
+ private final Set<Integer> nodeActivePartitions;
+ private Set<Integer> nodeInactivePartitions;
public PersistentLocalResourceRepository(List<IODeviceHandle> devices, String nodeId,
AsterixMetadataProperties metadataProperties) throws HyracksDataException {
@@ -86,6 +92,15 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
}
}
resourceCache = CacheBuilder.newBuilder().maximumSize(MAX_CACHED_RESOURCES).build();
+
+ ClusterPartition[] nodePartitions = metadataProperties.getNodePartitions().get(nodeId);
+ //initially the node active partitions are the same as the original partitions
+ nodeOriginalPartitions = new HashSet<>(nodePartitions.length);
+ nodeActivePartitions = new HashSet<>(nodePartitions.length);
+ for (ClusterPartition partition : nodePartitions) {
+ nodeOriginalPartitions.add(partition.getPartitionId());
+ nodeActivePartitions.add(partition.getPartitionId());
+ }
}
private static String getStorageMetadataDirPath(String mountPoint, String nodeId, int ioDeviceId) {
@@ -301,6 +316,7 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
}
private static final FilenameFilter METADATA_FILES_FILTER = new FilenameFilter() {
+ @Override
public boolean accept(File dir, String name) {
if (name.equalsIgnoreCase(METADATA_FILE_NAME)) {
return true;
@@ -316,6 +332,7 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
if (isReplicationEnabled) {
filesToBeReplicated = new HashSet<String>();
+ nodeInactivePartitions = ConcurrentHashMap.newKeySet();
}
}
@@ -404,4 +421,43 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
//currently each partition is replicated on the same IO device number on all NCs.
return mountPoints[clusterPartitions.get(partition).getIODeviceNum()];
}
-}
+
+ public Set<Integer> getActivePartitions() {
+ return Collections.unmodifiableSet(nodeActivePartitions);
+ }
+
+ public Set<Integer> getInactivePartitions() {
+ return Collections.unmodifiableSet(nodeInactivePartitions);
+ }
+
+ public Set<Integer> getNodeOrignalPartitions() {
+ return Collections.unmodifiableSet(nodeOriginalPartitions);
+ }
+
+ public synchronized void addActivePartition(int partitonId) {
+ nodeActivePartitions.add(partitonId);
+ nodeInactivePartitions.remove(partitonId);
+ }
+
+ public synchronized void addInactivePartition(int partitonId) {
+ nodeInactivePartitions.add(partitonId);
+ nodeActivePartitions.remove(partitonId);
+ }
+
+ /**
+ * @param resourceAbsolutePath
+ * @return the resource relative path starting from the partition directory
+ */
+ public static String getResourceRelativePath(String resourceAbsolutePath) {
+ String[] tokens = resourceAbsolutePath.split(File.separator);
+ //partiton/dataverse/idx/fileName
+ return tokens[tokens.length - 4] + File.separator + tokens[tokens.length - 3] + File.separator
+ + tokens[tokens.length - 2] + File.separator + tokens[tokens.length - 1];
+ }
+
+ public static int getResourcePartition(String resourceAbsolutePath) {
+ String[] tokens = resourceAbsolutePath.split(File.separator);
+ //partiton/dataverse/idx/fileName
+ return StoragePathUtil.getPartitonNumFromName(tokens[tokens.length - 4]);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index 1966c39..5649710 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -41,6 +41,9 @@ public class LogManagerWithReplication extends LogManager {
throw new IllegalStateException();
}
+ //only locally generated logs should be replicated
+ logRecord.setReplicated(logRecord.getLogSource() == LogSource.LOCAL);
+
//Remote flush logs do not need to be flushed separately since they may not trigger local flush
if (logRecord.getLogType() == LogType.FLUSH && logRecord.getLogSource() == LogSource.LOCAL) {
flushLogsQ.offer(logRecord);
@@ -54,7 +57,7 @@ public class LogManagerWithReplication extends LogManager {
protected void appendToLogTail(ILogRecord logRecord) throws ACIDException {
syncAppendToLogTail(logRecord);
- if (logRecord.getLogSource() == LogSource.LOCAL) {
+ if (logRecord.isReplicated()) {
replicationManager.replicateLog(logRecord);
}
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
index e0cddee..a018dc2 100644
--- a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogReader.java
@@ -26,11 +26,10 @@ import java.util.logging.Logger;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.transactions.ILogReader;
import org.apache.asterix.common.transactions.ILogRecord;
+import org.apache.asterix.common.transactions.ILogRecord.RECORD_STATUS;
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.transactions.MutableLong;
-import static org.apache.asterix.common.transactions.LogRecord.*;
-
/**
* NOTE: Many method calls of this class are not thread safe.
* Be very cautious using it in a multithreaded context.
http://git-wip-us.apache.org/repos/asf/incubator-asterixdb/blob/98d38e6a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/RemoteLogReader.java
----------------------------------------------------------------------
diff --git a/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/RemoteLogReader.java b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/RemoteLogReader.java
new file mode 100644
index 0000000..8f88321
--- /dev/null
+++ b/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/RemoteLogReader.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.transaction.management.service.logging;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.ILogReader;
+import org.apache.asterix.common.transactions.ILogRecord;
+import org.apache.asterix.common.transactions.ILogRecord.RECORD_STATUS;
+import org.apache.asterix.common.transactions.LogRecord;
+
+public class RemoteLogReader implements ILogReader {
+
+ private final FileChannel fileChannel;
+ private final ILogRecord logRecord;
+ private final ByteBuffer readBuffer;
+ private long readLSN;
+ private final int logPageSize;
+
+ public RemoteLogReader(FileChannel fileChannel, long logFileSize, int logPageSize) {
+ this.fileChannel = fileChannel;
+ this.logPageSize = logPageSize;
+ logRecord = new LogRecord();
+ readBuffer = ByteBuffer.allocate(logPageSize);
+ }
+
+ @Override
+ public void initializeScan(long beginLSN) throws ACIDException {
+ readLSN = beginLSN;
+ fillLogReadBuffer();
+ }
+
+ private boolean fillLogReadBuffer() throws ACIDException {
+ int size = 0;
+ int read = 0;
+ readBuffer.position(0);
+ readBuffer.limit(logPageSize);
+ try {
+ fileChannel.position(readLSN);
+ //We loop here because read() may return 0, but this simply means we are waiting on IO.
+ //Therefore we want to break out only when either the buffer is full, or we reach EOF.
+ while (size < logPageSize && read != -1) {
+ read = fileChannel.read(readBuffer);
+ if (read > 0) {
+ size += read;
+ }
+ }
+ } catch (IOException e) {
+ throw new ACIDException(e);
+ }
+ readBuffer.position(0);
+ readBuffer.limit(size);
+ if (size == 0 && read == -1) {
+ return false; //EOF
+ }
+ return true;
+ }
+
+ @Override
+ public ILogRecord read(long LSN) throws ACIDException {
+ throw new UnsupportedOperationException("Random read is not supported.");
+ }
+
+ @Override
+ public ILogRecord next() throws ACIDException {
+ if (readBuffer.position() == readBuffer.limit()) {
+ boolean hasRemaining = fillLogReadBuffer();
+ if (!hasRemaining) {
+ return null;
+ }
+ }
+
+ RECORD_STATUS status = logRecord.readRemoteLog(readBuffer, true);
+ switch (status) {
+ case TRUNCATED: {
+ //we may have just read off the end of the buffer, so try refiling it
+ if (!fillLogReadBuffer()) {
+ return null;
+ }
+ //now see what we have in the refilled buffer
+ status = logRecord.readRemoteLog(readBuffer, true);
+ switch (status) {
+ case TRUNCATED: {
+ return null;
+ }
+ case OK:
+ break;
+ default:
+ break;
+ }
+ //if we have exited the inner switch,
+ // this means status is really "OK" after buffer refill
+ break;
+ }
+ case BAD_CHKSUM: {
+ return null;
+ }
+ case OK:
+ break;
+ }
+
+ readLSN += logRecord.getSerializedLogSize();
+ return logRecord;
+ }
+
+ @Override
+ public void close() throws ACIDException {
+ try {
+ if (fileChannel != null) {
+ if (fileChannel.isOpen()) {
+ fileChannel.close();
+ }
+ }
+ } catch (IOException e) {
+ throw new ACIDException(e);
+ }
+ }
+
+}