You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by am...@apache.org on 2018/05/21 03:35:23 UTC
[3/5] asterixdb git commit: [NO ISSUE][STO] Misc Storage Fixes and
Improvements
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index 681669f..52c8962 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -19,7 +19,10 @@
package org.apache.asterix.common.context;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@@ -37,16 +40,19 @@ import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent.ComponentState;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
+import org.apache.hyracks.storage.am.lsm.common.api.IoOperationCompleteListener;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
import org.apache.hyracks.storage.common.IModificationOperationCallback;
import org.apache.hyracks.storage.common.ISearchOperationCallback;
-public class PrimaryIndexOperationTracker extends BaseOperationTracker {
+public class PrimaryIndexOperationTracker extends BaseOperationTracker implements IoOperationCompleteListener {
private final int partition;
// Number of active operations on an ILSMIndex instance.
private final AtomicInteger numActiveOperations;
@@ -54,6 +60,7 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker {
private final ILSMComponentIdGenerator idGenerator;
private boolean flushOnExit = false;
private boolean flushLogCreated = false;
+ private final Map<String, FlushOperation> scheduledFlushes = new HashMap<>();
public PrimaryIndexOperationTracker(int datasetID, int partition, ILogManager logManager, DatasetInfo dsInfo,
ILSMComponentIdGenerator idGenerator) {
@@ -107,6 +114,7 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker {
}
if (needsFlush || flushOnExit) {
+ flushOnExit = false;
// make the current mutable components READABLE_UNWRITABLE to stop coming modify operations from entering
// them until the current flush is scheduled.
LSMComponentId primaryId = null;
@@ -143,7 +151,6 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker {
throw new IllegalStateException("Primary index not found in dataset " + dsInfo.getDatasetID());
}
LogRecord logRecord = new LogRecord();
- flushOnExit = false;
if (dsInfo.isDurable()) {
/*
* Generate a FLUSH log.
@@ -182,16 +189,36 @@ public class PrimaryIndexOperationTracker extends BaseOperationTracker {
Map<String, Object> flushMap = new HashMap<>();
flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
flushMap.put(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID, nextComponentId);
- for (ILSMIndex lsmIndex : dsInfo.getDatasetPartitionOpenIndexes(partition)) {
- ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE);
- accessor.getOpContext().setParameters(flushMap);
- accessor.scheduleFlush();
+ synchronized (scheduledFlushes) {
+ for (ILSMIndex lsmIndex : dsInfo.getDatasetPartitionOpenIndexes(partition)) {
+ ILSMIndexAccessor accessor = lsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ accessor.getOpContext().setParameters(flushMap);
+ ILSMIOOperation flush = accessor.scheduleFlush();
+ scheduledFlushes.put(flush.getTarget().getRelativePath(), (FlushOperation) flush);
+ flush.addCompleteListener(this);
+ }
}
} finally {
flushLogCreated = false;
}
}
+ @Override
+ public void completed(ILSMIOOperation operation) {
+ synchronized (scheduledFlushes) {
+ scheduledFlushes.remove(operation.getTarget().getRelativePath());
+ }
+ }
+
+ public List<FlushOperation> getScheduledFlushes() {
+ synchronized (scheduledFlushes) {
+ Collection<FlushOperation> scheduled = scheduledFlushes.values();
+ List<FlushOperation> flushes = new ArrayList<FlushOperation>(scheduled.size());
+ flushes.addAll(scheduled);
+ return flushes;
+ }
+ }
+
public int getNumActiveOperations() {
return numActiveOperations.get();
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java
index 99ab2d0..71d16f7 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java
@@ -18,9 +18,12 @@
*/
package org.apache.asterix.common.dataflow;
+import java.util.List;
+
import org.apache.asterix.common.ioopcallbacks.LSMIOOperationCallback;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
public class LSMIndexUtil {
@@ -41,4 +44,15 @@ public class LSMIndexUtil {
}
}
}
+
+ public static void waitFor(List<? extends ILSMIOOperation> ioOperations) throws HyracksDataException {
+ for (int i = 0; i < ioOperations.size(); i++) {
+ try {
+ ioOperations.get(i).sync();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw HyracksDataException.create(e);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
index 50f5906..ea53d68 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
@@ -20,19 +20,21 @@
package org.apache.asterix.common.ioopcallbacks;
import java.util.ArrayDeque;
+import java.util.Collection;
import java.util.Deque;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import org.apache.asterix.common.context.DatasetInfo;
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.asterix.common.storage.ResourceReference;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.data.std.primitive.LongPointable;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
import org.apache.hyracks.storage.am.common.freepage.MutableArrayValueReference;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
@@ -45,13 +47,19 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata;
import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
// A single LSMIOOperationCallback per LSM index used to perform actions around Flush and Merge operations
public class LSMIOOperationCallback implements ILSMIOOperationCallback {
+ private static final Logger LOGGER = LogManager.getLogger();
public static final String KEY_FLUSH_LOG_LSN = "FlushLogLsn";
public static final String KEY_NEXT_COMPONENT_ID = "NextComponentId";
+ public static final String KEY_FLUSHED_COMPONENT_ID = "FlushedComponentId";
private static final String KEY_FIRST_LSN = "FirstLsn";
private static final MutableArrayValueReference KEY_METADATA_FLUSH_LOG_LSN =
new MutableArrayValueReference(KEY_FLUSH_LOG_LSN.getBytes());
@@ -83,8 +91,11 @@ public class LSMIOOperationCallback implements ILSMIOOperationCallback {
if (operation.getStatus() == LSMIOOperationStatus.FAILURE) {
return;
}
- if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH) {
- Map<String, Object> map = operation.getAccessor().getOpContext().getParameters();
+ if (operation.getIOOpertionType() == LSMIOOperationType.LOAD) {
+ Map<String, Object> map = operation.getParameters();
+ putComponentIdIntoMetadata(operation.getNewComponent(), (LSMComponentId) map.get(KEY_FLUSHED_COMPONENT_ID));
+ } else if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH) {
+ Map<String, Object> map = operation.getParameters();
putLSNIntoMetadata(operation.getNewComponent(), (Long) map.get(KEY_FLUSH_LOG_LSN));
putComponentIdIntoMetadata(operation.getNewComponent(),
((FlushOperation) operation).getFlushingComponent().getId());
@@ -104,16 +115,64 @@ public class LSMIOOperationCallback implements ILSMIOOperationCallback {
if (operation.getStatus() == LSMIOOperationStatus.FAILURE) {
return;
}
- if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH) {
- Map<String, Object> map = operation.getAccessor().getOpContext().getParameters();
- final Long lsn = (Long) map.get(KEY_FLUSH_LOG_LSN);
- final Optional<String> componentFile =
- operation.getNewComponent().getLSMComponentPhysicalFiles().stream().findAny();
- if (componentFile.isPresent()) {
- final ResourceReference ref = ResourceReference.of(componentFile.get());
- final String componentEndTime = AbstractLSMIndexFileManager.getComponentEndTime(ref.getName());
- indexCheckpointManagerProvider.get(ref).flushed(componentEndTime, lsn);
+ if (operation.getIOOpertionType() != LSMIOOperationType.LOAD
+ && operation.getAccessor().getOpContext().getOperation() == IndexOperation.DELETE_COMPONENTS) {
+ deleteComponentsFromCheckpoint(operation);
+ } else if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH
+ || operation.getIOOpertionType() == LSMIOOperationType.LOAD) {
+ addComponentToCheckpoint(operation);
+ }
+ }
+
+ private void addComponentToCheckpoint(ILSMIOOperation operation) throws HyracksDataException {
+ // will always update the checkpoint file even if no new component was created
+ FileReference target = operation.getTarget();
+ Map<String, Object> map = operation.getParameters();
+ final Long lsn =
+ operation.getIOOpertionType() == LSMIOOperationType.FLUSH ? (Long) map.get(KEY_FLUSH_LOG_LSN) : 0L;
+ final LSMComponentId id = (LSMComponentId) map.get(KEY_FLUSHED_COMPONENT_ID);
+ final ResourceReference ref = ResourceReference.of(target.getAbsolutePath());
+ final String componentEndTime = AbstractLSMIndexFileManager.getComponentEndTime(ref.getName());
+ indexCheckpointManagerProvider.get(ref).flushed(componentEndTime, lsn, id.getMaxId());
+ }
+
+ private void deleteComponentsFromCheckpoint(ILSMIOOperation operation) throws HyracksDataException {
+ // component was deleted... if a flush, do nothing.. if a merge, must update the checkpoint file
+ if (operation.getIOOpertionType() == LSMIOOperationType.MERGE) {
+ // Get component id of the last disk component
+ LSMComponentId mostRecentComponentId =
+ getMostRecentComponentId(operation.getAccessor().getOpContext().getComponentsToBeMerged());
+ // Update the checkpoint file
+ FileReference target = operation.getTarget();
+ final ResourceReference ref = ResourceReference.of(target.getAbsolutePath());
+ indexCheckpointManagerProvider.get(ref).setLastComponentId(mostRecentComponentId.getMaxId());
+ } else if (operation.getIOOpertionType() != LSMIOOperationType.FLUSH) {
+ throw new IllegalStateException("Unexpected IO operation: " + operation.getIOOpertionType());
+ }
+ }
+
+ private LSMComponentId getMostRecentComponentId(Collection<ILSMDiskComponent> deletedComponents)
+ throws HyracksDataException {
+ // must sync on opTracker to ensure list of components doesn't change
+ synchronized (lsmIndex.getOperationTracker()) {
+ List<ILSMDiskComponent> diskComponents = lsmIndex.getDiskComponents();
+ if (diskComponents.isEmpty()) {
+ LOGGER.log(Level.INFO, "There are no disk components");
+ return LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID;
+ }
+ if (deletedComponents.contains(diskComponents.get(diskComponents.size() - 1))) {
+ LOGGER.log(Level.INFO, "All disk components have been deleted");
+ return LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID;
}
+ int mostRecentComponentIndex = 0;
+ for (int i = 0; i < diskComponents.size(); i++) {
+ if (!deletedComponents.contains(diskComponents.get(i))) {
+ break;
+ }
+ mostRecentComponentIndex++;
+ }
+ ILSMDiskComponent mostRecentDiskComponent = diskComponents.get(mostRecentComponentIndex);
+ return (LSMComponentId) mostRecentDiskComponent.getId();
}
}
@@ -153,14 +212,6 @@ public class LSMIOOperationCallback implements ILSMIOOperationCallback {
LSMComponentIdUtils.persist(componentId, newComponent.getMetadata());
}
- /**
- * Used during the recovery process to force refresh the next component id
- */
- public void forceRefreshNextId(ILSMComponentId nextComponentId) {
- componentIds.clear();
- componentIds.add(nextComponentId);
- }
-
public synchronized void setFirstLsnForCurrentMemoryComponent(long firstLsn) {
this.firstLsnForCurrentMemoryComponent = firstLsn;
if (pendingFlushes == 0) {
@@ -195,9 +246,11 @@ public class LSMIOOperationCallback implements ILSMIOOperationCallback {
dsInfo.declareActiveIOOperation();
if (operation.getIOOpertionType() == LSMIOOperationType.FLUSH) {
pendingFlushes++;
+ FlushOperation flush = (FlushOperation) operation;
Map<String, Object> map = operation.getAccessor().getOpContext().getParameters();
Long flushLsn = (Long) map.get(KEY_FLUSH_LOG_LSN);
map.put(KEY_FIRST_LSN, firstLsnForCurrentMemoryComponent);
+ map.put(KEY_FLUSHED_COMPONENT_ID, flush.getFlushingComponent().getId());
componentIds.add((ILSMComponentId) map.get(KEY_NEXT_COMPONENT_ID));
firstLsnForCurrentMemoryComponent = flushLsn; // Advance the first lsn for new component
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
index b008f11..2c0872c 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
@@ -27,31 +27,33 @@ public interface IIndexCheckpointManager {
/**
* Initializes the first checkpoint of an index with low watermark {@code lsn}
*
+ * @param componentTimestamp
* @param lsn
* @throws HyracksDataException
*/
- void init(long lsn) throws HyracksDataException;
+ void init(String componentTimestamp, long lsn) throws HyracksDataException;
/**
- * Called when a new LSM disk component is flushed. When called, the index checkpoiint is updated
+ * Called when a new LSM disk component is flushed. When called, the index checkpoint is updated
* with the latest valid {@code componentTimestamp} and low watermark {@code lsn}
*
* @param componentTimestamp
* @param lsn
* @throws HyracksDataException
*/
- void flushed(String componentTimestamp, long lsn) throws HyracksDataException;
+ void flushed(String componentTimestamp, long lsn, long componentId) throws HyracksDataException;
/**
- * Called when a new LSM disk component is replicated from master. When called, the index checkpoiint is updated
+ * Called when a new LSM disk component is replicated from master. When called, the index checkpoint is updated
* with the latest valid {@code componentTimestamp} and the local lsn mapping of {@code masterLsn} is set as the
* new low watermark.
*
* @param componentTimestamp
* @param masterLsn
+ * @param componentId
* @throws HyracksDataException
*/
- void replicated(String componentTimestamp, long masterLsn) throws HyracksDataException;
+ void replicated(String componentTimestamp, long masterLsn, long componentId) throws HyracksDataException;
/**
* Called when a flush log is received and replicated from master. The mapping between
@@ -89,13 +91,37 @@ public interface IIndexCheckpointManager {
* Gets the index last valid component timestamp if the index has any components. Otherwise {@link Optional#empty()}
*
* @return the index last valid component timestamp
+ * @throws HyracksDataException
*/
- Optional<String> getValidComponentTimestamp();
+ Optional<String> getValidComponentTimestamp() throws HyracksDataException;
/**
* Gets the number of valid checkpoints the index has.
*
* @return the number of valid checkpoints
+ * @throws HyracksDataException
+ */
+ int getCheckpointCount() throws HyracksDataException;
+
+ /**
+ * @return the latest checkpoint
+ * @throws HyracksDataException
+ */
+ IndexCheckpoint getLatest() throws HyracksDataException;
+
+ /**
+ * Advance the last valid component timestamp. Used for replicated bulkloaded components
+ *
+ * @param timeStamp
+ * @throws HyracksDataException
+ */
+ void advanceValidComponentTimestamp(String timeStamp) throws HyracksDataException;
+
+ /**
+ * Set the last component id. Used during recovery or after component delete
+ *
+ * @param componentId
+ * @throws HyracksDataException
*/
- int getCheckpointCount();
+ void setLastComponentId(long componentId) throws HyracksDataException;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
index 6e845e1..73d3122 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -35,24 +36,28 @@ public class IndexCheckpoint {
private long id;
private String validComponentTimestamp;
private long lowWatermark;
+ private long lastComponentId;
private Map<Long, Long> masterNodeFlushMap;
- public static IndexCheckpoint first(long lowWatermark) {
+ public static IndexCheckpoint first(String lastComponentTimestamp, long lowWatermark) {
IndexCheckpoint firstCheckpoint = new IndexCheckpoint();
firstCheckpoint.id = INITIAL_CHECKPOINT_ID;
firstCheckpoint.lowWatermark = lowWatermark;
- firstCheckpoint.validComponentTimestamp = null;
+ firstCheckpoint.validComponentTimestamp = lastComponentTimestamp;
+ firstCheckpoint.lastComponentId = LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId();
firstCheckpoint.masterNodeFlushMap = new HashMap<>();
return firstCheckpoint;
}
- public static IndexCheckpoint next(IndexCheckpoint latest, long lowWatermark, String validComponentTimestamp) {
+ public static IndexCheckpoint next(IndexCheckpoint latest, long lowWatermark, String validComponentTimestamp,
+ long lastComponentId) {
if (lowWatermark < latest.getLowWatermark()) {
throw new IllegalStateException("Low watermark should always be increasing");
}
IndexCheckpoint next = new IndexCheckpoint();
next.id = latest.getId() + 1;
next.lowWatermark = lowWatermark;
+ next.lastComponentId = lastComponentId;
next.validComponentTimestamp = validComponentTimestamp;
next.masterNodeFlushMap = latest.getMasterNodeFlushMap();
// remove any lsn from the map that wont be used anymore
@@ -72,6 +77,10 @@ public class IndexCheckpoint {
return lowWatermark;
}
+ public long getLastComponentId() {
+ return lastComponentId;
+ }
+
public Map<Long, Long> getMasterNodeFlushMap() {
return masterNodeFlushMap;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManagerFactory.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManagerFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManagerFactory.java
new file mode 100644
index 0000000..1da5c9c
--- /dev/null
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IRecoveryManagerFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.transactions;
+
+import org.apache.hyracks.api.application.INCServiceContext;
+
+public interface IRecoveryManagerFactory {
+
+ /**
+ * Create the local recovery manager
+ *
+ * @param serviceCtx
+ * the service context
+ * @param txnSubsystem
+ * the transaction subsystem
+ * @return the recovery manager
+ */
+ IRecoveryManager createRecoveryManager(INCServiceContext serviceCtx, ITransactionSubsystem txnSubsystem);
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
index 5fdb4e2..0c3b21d 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogRecord.java
@@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.CRC32;
import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
-import org.apache.commons.lang3.ArrayUtils;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.tuples.SimpleTupleReference;
import org.apache.hyracks.storage.am.common.tuples.SimpleTupleWriter;
@@ -271,6 +270,7 @@ public class LogRecord implements ILogRecord {
computeAndSetLogSize();
break;
case LogType.WAIT:
+ case LogType.WAIT_FOR_FLUSHES:
computeAndSetLogSize();
break;
case LogType.JOB_COMMIT:
@@ -462,6 +462,7 @@ public class LogRecord implements ILogRecord {
logSize = FLUSH_LOG_SIZE;
break;
case LogType.WAIT:
+ case LogType.WAIT_FOR_FLUSHES:
logSize = WAIT_LOG_SIZE;
break;
case LogType.FILTER:
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
index f02b0de..2d76a11 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/LogType.java
@@ -28,6 +28,7 @@ public class LogType {
public static final byte WAIT = 6;
public static final byte FILTER = 7;
public static final byte MARKER = 8;
+ public static final byte WAIT_FOR_FLUSHES = 9;
private static final String STRING_UPDATE = "UPDATE";
private static final String STRING_JOB_COMMIT = "JOB_COMMIT";
@@ -37,6 +38,7 @@ public class LogType {
private static final String STRING_WAIT = "WAIT";
private static final String STRING_FILTER = "FILTER";
private static final String STRING_MARKER = "MARKER";
+ private static final String STRING_WAIT_FOR_FLUSHES = "WAIT_FOR_FLUSHES";
private static final String STRING_UNKNOWN_LOG_TYPE = "UNKNOWN_LOG_TYPE";
public static String toString(byte logType) {
@@ -53,6 +55,8 @@ public class LogType {
return STRING_FLUSH;
case LogType.WAIT:
return STRING_WAIT;
+ case LogType.WAIT_FOR_FLUSHES:
+ return STRING_WAIT_FOR_FLUSHES;
case LogType.FILTER:
return STRING_FILTER;
case LogType.MARKER:
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
index 6b13468..aa2c7af 100644
--- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
+++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java
@@ -19,17 +19,17 @@
package org.apache.asterix.common.utils;
import java.io.File;
+import java.nio.file.Path;
import java.nio.file.Paths;
-import java.util.function.Function;
import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.storage.IndexPathElements;
import org.apache.asterix.common.storage.ResourceReference;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileSplit;
+import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.io.MappedFileSplit;
import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
@@ -143,4 +143,16 @@ public class StoragePathUtil {
public static String getIndexNameFromPath(String path) {
return Paths.get(path).getFileName().toString();
}
+
+ /**
+ * Get the path of the index containing the passed reference
+ *
+ * @param ioManager
+ * @param ref
+ * @return
+ * @throws HyracksDataException
+ */
+ public static Path getIndexPath(IIOManager ioManager, ResourceReference ref) throws HyracksDataException {
+ return ioManager.resolve(ref.getRelativePath().toString()).getFile().toPath();
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
index 7af7b6e..29a2aa0 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/LSMIOOperationCallbackTest.java
@@ -19,6 +19,9 @@
package org.apache.asterix.test.ioopcallbacks;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@@ -35,6 +38,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMMemoryComponent;
import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata;
import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
@@ -60,6 +64,15 @@ public class LSMIOOperationCallbackTest extends TestCase {
* 7. destroy
*/
+ private static final Format FORMATTER =
+ new SimpleDateFormat(AbstractLSMIndexFileManager.COMPONENT_TIMESTAMP_FORMAT);
+
+ private static String getComponentFileName() {
+ Date date = new Date();
+ String ts = FORMATTER.format(date);
+ return ts + '_' + ts;
+ }
+
@Test
public void testNormalSequence() throws HyracksDataException {
int numMemoryComponents = 2;
@@ -81,7 +94,7 @@ public class LSMIOOperationCallbackTest extends TestCase {
flushMap.put(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID, nextComponentId);
ILSMIndexAccessor firstAccessor = new TestLSMIndexAccessor(new TestLSMIndexOperationContext(mockIndex));
firstAccessor.getOpContext().setParameters(flushMap);
- FileReference firstTarget = new FileReference(Mockito.mock(IODeviceHandle.class), "path");
+ FileReference firstTarget = new FileReference(Mockito.mock(IODeviceHandle.class), getComponentFileName());
LSMComponentFileReferences firstFiles = new LSMComponentFileReferences(firstTarget, firstTarget, firstTarget);
FlushOperation firstFlush = new TestFlushOperation(firstAccessor, firstTarget, callback, indexId, firstFiles,
new LSMComponentId(0, 0));
@@ -97,7 +110,7 @@ public class LSMIOOperationCallbackTest extends TestCase {
flushMap.put(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID, nextComponentId);
ILSMIndexAccessor secondAccessor = new TestLSMIndexAccessor(new TestLSMIndexOperationContext(mockIndex));
secondAccessor.getOpContext().setParameters(flushMap);
- FileReference secondTarget = new FileReference(Mockito.mock(IODeviceHandle.class), "path");
+ FileReference secondTarget = new FileReference(Mockito.mock(IODeviceHandle.class), getComponentFileName());
LSMComponentFileReferences secondFiles =
new LSMComponentFileReferences(secondTarget, secondTarget, secondTarget);
FlushOperation secondFlush = new TestFlushOperation(secondAccessor, secondTarget, callback, indexId,
@@ -175,7 +188,7 @@ public class LSMIOOperationCallbackTest extends TestCase {
flushMap.put(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID, expectedId);
ILSMIndexAccessor accessor = new TestLSMIndexAccessor(new TestLSMIndexOperationContext(mockIndex));
accessor.getOpContext().setParameters(flushMap);
- FileReference target = new FileReference(Mockito.mock(IODeviceHandle.class), "path");
+ FileReference target = new FileReference(Mockito.mock(IODeviceHandle.class), getComponentFileName());
LSMComponentFileReferences files = new LSMComponentFileReferences(target, target, target);
FlushOperation flush =
new TestFlushOperation(accessor, target, callback, indexId, files, new LSMComponentId(0, 0));
@@ -210,7 +223,7 @@ public class LSMIOOperationCallbackTest extends TestCase {
IIndexCheckpointManagerProvider indexCheckpointManagerProvider =
Mockito.mock(IIndexCheckpointManagerProvider.class);
IIndexCheckpointManager indexCheckpointManager = Mockito.mock(IIndexCheckpointManager.class);
- Mockito.doNothing().when(indexCheckpointManager).flushed(Mockito.any(), Mockito.anyLong());
+ Mockito.doNothing().when(indexCheckpointManager).flushed(Mockito.any(), Mockito.anyLong(), Mockito.anyLong());
Mockito.doReturn(indexCheckpointManager).when(indexCheckpointManagerProvider).get(Mockito.any());
return indexCheckpointManagerProvider;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/TestLSMIndexAccessor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/TestLSMIndexAccessor.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/TestLSMIndexAccessor.java
index 9ac143c..c2621d8 100644
--- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/TestLSMIndexAccessor.java
+++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/ioopcallbacks/TestLSMIndexAccessor.java
@@ -141,7 +141,7 @@ public class TestLSMIndexAccessor implements ILSMIndexAccessor {
}
@Override
- public void scheduleReplication(List<ILSMDiskComponent> diskComponents, boolean bulkload, LSMOperationType opType)
+ public void scheduleReplication(List<ILSMDiskComponent> diskComponents, LSMOperationType opType)
throws HyracksDataException {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexCreateOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexCreateOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexCreateOperatorDescriptor.java
index 79dc396..df4c093 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexCreateOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexCreateOperatorDescriptor.java
@@ -18,8 +18,11 @@
*/
package org.apache.asterix.external.operators;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import org.apache.asterix.common.ioopcallbacks.LSMIOOperationCallback;
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.external.indexing.FileIndexTupleTranslator;
import org.apache.hyracks.api.comm.IFrameWriter;
@@ -36,7 +39,8 @@ import org.apache.hyracks.storage.am.common.api.IIndexBuilder;
import org.apache.hyracks.storage.am.common.api.IIndexBuilderFactory;
import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
-import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
import org.apache.hyracks.storage.common.IIndexBulkLoader;
/**
@@ -74,10 +78,13 @@ public class ExternalFilesIndexCreateOperatorDescriptor extends AbstractSingleAc
// Open the index
indexHelper.open();
try {
- IIndex index = indexHelper.getIndexInstance();
+ ILSMIndex index = (ILSMIndex) indexHelper.getIndexInstance();
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put(LSMIOOperationCallback.KEY_FLUSHED_COMPONENT_ID,
+ LSMComponentId.DEFAULT_COMPONENT_ID);
// Create bulk loader
IIndexBulkLoader bulkLoader =
- index.createBulkLoader(BTree.DEFAULT_FILL_FACTOR, false, files.size(), false);
+ index.createBulkLoader(BTree.DEFAULT_FILL_FACTOR, false, files.size(), false, parameters);
// Load files
for (ExternalFile file : files) {
bulkLoader.add(filesTupleTranslator.getTupleFromFile(file));
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexModificationOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexModificationOperatorDescriptor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexModificationOperatorDescriptor.java
index 4bc2867..3bada4a 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexModificationOperatorDescriptor.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalFilesIndexModificationOperatorDescriptor.java
@@ -18,7 +18,9 @@
*/
package org.apache.asterix.external.operators;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.external.indexing.ExternalFile;
@@ -69,9 +71,10 @@ public class ExternalFilesIndexModificationOperatorDescriptor extends AbstractSi
indexHelper.open();
IIndex index = indexHelper.getIndexInstance();
LSMTwoPCBTreeBulkLoader bulkLoader = null;
+ Map<String, Object> parameters = new HashMap<>();
try {
bulkLoader = (LSMTwoPCBTreeBulkLoader) ((ExternalBTree) index)
- .createTransactionBulkLoader(BTree.DEFAULT_FILL_FACTOR, false, files.size());
+ .createTransactionBulkLoader(BTree.DEFAULT_FILL_FACTOR, false, files.size(), parameters);
// Load files
// The files must be ordered according to their numbers
for (ExternalFile file : files) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorNodePushable.java
index 573de5d..74bc0dc 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkLoadOperatorNodePushable.java
@@ -18,12 +18,18 @@
*/
package org.apache.asterix.external.operators;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.asterix.common.ioopcallbacks.LSMIOOperationCallback;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.dataflow.IndexBulkLoadOperatorNodePushable;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ITwoPCIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
public class ExternalIndexBulkLoadOperatorNodePushable extends IndexBulkLoadOperatorNodePushable {
@@ -43,4 +49,12 @@ public class ExternalIndexBulkLoadOperatorNodePushable extends IndexBulkLoadOper
super.open();
((ITwoPCIndex) index).setCurrentVersion(version);
}
+
+ @Override
+ protected void initializeBulkLoader() throws HyracksDataException {
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put(LSMIOOperationCallback.KEY_FLUSHED_COMPONENT_ID, LSMComponentId.DEFAULT_COMPONENT_ID);
+ bulkLoader = ((ILSMIndex) index).createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
+ parameters);
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
index aaca3f1..57e2917 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalIndexBulkModifyOperatorNodePushable.java
@@ -19,6 +19,8 @@
package org.apache.asterix.external.operators;
import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.asterix.external.indexing.FilesIndexDescription;
import org.apache.asterix.om.base.AMutableInt32;
@@ -60,8 +62,9 @@ public class ExternalIndexBulkModifyOperatorNodePushable extends IndexBulkLoadOp
try {
writer.open();
// Transactional BulkLoader
- bulkLoader =
- ((ITwoPCIndex) index).createTransactionBulkLoader(fillFactor, verifyInput, deletedFiles.length);
+ Map<String, Object> parameters = new HashMap<>();
+ bulkLoader = ((ITwoPCIndex) index).createTransactionBulkLoader(fillFactor, verifyInput, deletedFiles.length,
+ parameters);
// Delete files
for (int i = 0; i < deletedFiles.length; i++) {
fileNumber.setValue(deletedFiles[i]);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
index 84922cd..d4d601c 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
@@ -22,16 +22,20 @@ import java.io.DataInput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.nio.file.Path;
import java.util.Collection;
import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.replication.api.IReplicationWorker;
import org.apache.asterix.common.storage.DatasetResourceReference;
import org.apache.asterix.common.storage.IIndexCheckpointManager;
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.replication.api.IReplicaTask;
+import org.apache.asterix.replication.api.IReplicationWorker;
import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IIOManager;
+import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
import org.apache.hyracks.storage.common.LocalResource;
/**
@@ -51,13 +55,27 @@ public class CheckpointPartitionIndexesTask implements IReplicaTask {
appCtx.getIndexCheckpointManagerProvider();
PersistentLocalResourceRepository resRepo =
(PersistentLocalResourceRepository) appCtx.getLocalResourceRepository();
+ final IIOManager ioManager = appCtx.getIoManager();
final Collection<LocalResource> partitionResources = resRepo.getPartitionResources(partition).values();
final long currentLSN = appCtx.getTransactionSubsystem().getLogManager().getAppendLSN();
for (LocalResource ls : partitionResources) {
- final IIndexCheckpointManager indexCheckpointManager =
- indexCheckpointManagerProvider.get(DatasetResourceReference.of(ls));
+ DatasetResourceReference ref = DatasetResourceReference.of(ls);
+ final IIndexCheckpointManager indexCheckpointManager = indexCheckpointManagerProvider.get(ref);
indexCheckpointManager.delete();
- indexCheckpointManager.init(currentLSN);
+ // Get most recent timestamp of existing files to avoid deletion
+ Path indexPath = StoragePathUtil.getIndexPath(ioManager, ref);
+ String[] files = indexPath.toFile().list(AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER);
+ if (files == null) {
+ throw HyracksDataException
+ .create(new IOException(indexPath + " is not a directory or an IO Error occurred"));
+ }
+ String mostRecentTimestamp = null;
+ for (String file : files) {
+ String nextTimeStamp = AbstractLSMIndexFileManager.getComponentEndTime(file);
+ mostRecentTimestamp = mostRecentTimestamp == null || nextTimeStamp.compareTo(mostRecentTimestamp) > 0
+ ? nextTimeStamp : mostRecentTimestamp;
+ }
+ indexCheckpointManager.init(mostRecentTimestamp, currentLSN);
}
ReplicationProtocol.sendAck(worker.getChannel(), worker.getReusableBuffer());
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
index 57474ef..a4f9b43 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
@@ -34,6 +34,7 @@ import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.asterix.common.storage.ResourceReference;
import org.apache.asterix.replication.api.IReplicaTask;
import org.apache.asterix.replication.api.IReplicationWorker;
+import org.apache.asterix.replication.sync.IndexSynchronizer;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
@@ -43,17 +44,21 @@ import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManage
public class MarkComponentValidTask implements IReplicaTask {
private final long masterLsn;
+ private final long lastComponentId;
private final String file;
- public MarkComponentValidTask(String file, long masterLsn) {
+ public MarkComponentValidTask(String file, long masterLsn, long lastComponentId) {
this.file = file;
+ this.lastComponentId = lastComponentId;
this.masterLsn = masterLsn;
}
@Override
public void perform(INcApplicationContext appCtx, IReplicationWorker worker) {
try {
- if (masterLsn > 0) {
+ if (masterLsn == IndexSynchronizer.BULKLOAD_LSN) {
+ updateBulkLoadedLastComponentTimestamp(appCtx);
+ } else if (masterLsn != IndexSynchronizer.MERGE_LSN) {
ensureComponentLsnFlushed(appCtx);
}
// delete mask
@@ -65,6 +70,15 @@ public class MarkComponentValidTask implements IReplicaTask {
}
}
+ private void updateBulkLoadedLastComponentTimestamp(INcApplicationContext appCtx) throws HyracksDataException {
+ final ResourceReference indexRef = ResourceReference.of(file);
+ final IIndexCheckpointManagerProvider checkpointManagerProvider = appCtx.getIndexCheckpointManagerProvider();
+ final IIndexCheckpointManager indexCheckpointManager = checkpointManagerProvider.get(indexRef);
+ final String componentEndTime = AbstractLSMIndexFileManager.getComponentEndTime(indexRef.getName());
+ indexCheckpointManager.advanceValidComponentTimestamp(componentEndTime);
+
+ }
+
private void ensureComponentLsnFlushed(INcApplicationContext appCtx)
throws HyracksDataException, InterruptedException {
final ResourceReference indexRef = ResourceReference.of(file);
@@ -82,7 +96,7 @@ public class MarkComponentValidTask implements IReplicaTask {
replicationTimeOut -= TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
}
final String componentEndTime = AbstractLSMIndexFileManager.getComponentEndTime(indexRef.getName());
- indexCheckpointManager.replicated(componentEndTime, masterLsn);
+ indexCheckpointManager.replicated(componentEndTime, masterLsn, lastComponentId);
}
}
@@ -97,6 +111,7 @@ public class MarkComponentValidTask implements IReplicaTask {
final DataOutputStream dos = new DataOutputStream(out);
dos.writeUTF(file);
dos.writeLong(masterLsn);
+ dos.writeLong(lastComponentId);
} catch (IOException e) {
throw HyracksDataException.create(e);
}
@@ -105,6 +120,7 @@ public class MarkComponentValidTask implements IReplicaTask {
public static MarkComponentValidTask create(DataInput input) throws IOException {
final String indexFile = input.readUTF();
final long lsn = input.readLong();
- return new MarkComponentValidTask(indexFile, lsn);
+ final long lastComponentId = input.readLong();
+ return new MarkComponentValidTask(indexFile, lsn, lastComponentId);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
index ca0fcca..20663d1 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
@@ -30,12 +30,12 @@ import java.nio.file.Paths;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.exceptions.ReplicationException;
-import org.apache.asterix.replication.api.IReplicationWorker;
import org.apache.asterix.common.storage.IIndexCheckpointManager;
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.asterix.common.storage.ResourceReference;
import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.replication.api.IReplicaTask;
+import org.apache.asterix.replication.api.IReplicationWorker;
import org.apache.asterix.replication.management.NetworkingUtil;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
@@ -98,7 +98,7 @@ public class ReplicateFileTask implements IReplicaTask {
final IIndexCheckpointManager indexCheckpointManager = checkpointManagerProvider.get(indexRef);
final long currentLSN = appCtx.getTransactionSubsystem().getLogManager().getAppendLSN();
indexCheckpointManager.delete();
- indexCheckpointManager.init(currentLSN);
+ indexCheckpointManager.init(null, currentLSN);
LOGGER.info(() -> "Checkpoint index: " + indexRef);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java
index 0e07ac7..30a5595 100644
--- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java
+++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java
@@ -37,12 +37,15 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexReplicationJob;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class IndexSynchronizer {
private static final Logger LOGGER = LogManager.getLogger();
+ public static final long MERGE_LSN = -1;
+ public static final long BULKLOAD_LSN = -2;
private final IReplicationJob job;
private final INcApplicationContext appCtx;
@@ -91,7 +94,8 @@ public class IndexSynchronizer {
final FileSynchronizer fileSynchronizer = new FileSynchronizer(appCtx, replica);
job.getJobFiles().stream().map(StoragePathUtil::getFileRelativePath).forEach(fileSynchronizer::replicate);
// send mark component valid
- MarkComponentValidTask markValidTask = new MarkComponentValidTask(indexFile, getReplicatedComponentLsn());
+ MarkComponentValidTask markValidTask =
+ new MarkComponentValidTask(indexFile, getReplicatedComponentLsn(), getReplicatedComponentId());
ReplicationProtocol.sendTo(replica, markValidTask);
ReplicationProtocol.waitForAck(replica);
LOGGER.debug("Replicated component ({}) to replica {}", indexFile, replica);
@@ -118,6 +122,12 @@ public class IndexSynchronizer {
private long getReplicatedComponentLsn() throws HyracksDataException {
final ILSMIndexReplicationJob indexReplJob = (ILSMIndexReplicationJob) job;
+ if (indexReplJob.getLSMOpType() == LSMOperationType.MERGE) {
+ return MERGE_LSN;
+ } else if (indexReplJob.getLSMOpType() == LSMOperationType.LOAD) {
+ return BULKLOAD_LSN;
+ }
+
if (indexReplJob.getLSMOpType() != LSMOperationType.FLUSH) {
return LSMIOOperationCallback.INVALID_LSN;
}
@@ -126,4 +136,14 @@ public class IndexSynchronizer {
return ((LSMIOOperationCallback) lsmIndex.getIOOperationCallback())
.getComponentLSN(ctx.getComponentsToBeReplicated());
}
+
+ private long getReplicatedComponentId() throws HyracksDataException {
+ final ILSMIndexReplicationJob indexReplJob = (ILSMIndexReplicationJob) job;
+ if (indexReplJob.getLSMOpType() != LSMOperationType.FLUSH) {
+ return -1L;
+ }
+ final ILSMIndexOperationContext ctx = indexReplJob.getLSMIndexOperationContext();
+ LSMComponentId id = (LSMComponentId) ctx.getComponentsToBeReplicated().get(0).getId();
+ return id.getMinId();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java
index 2415556..4130490 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMIndexBulkLoadOperatorNodePushable.java
@@ -18,10 +18,13 @@
*/
package org.apache.asterix.runtime.operators;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.ioopcallbacks.LSMIOOperationCallback;
import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -33,7 +36,6 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
-import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexDiskComponentBulkLoader;
import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
public class LSMIndexBulkLoadOperatorNodePushable extends IndexBulkLoadOperatorNodePushable {
@@ -71,29 +73,26 @@ public class LSMIndexBulkLoadOperatorNodePushable extends IndexBulkLoadOperatorN
@Override
protected void initializeBulkLoader() throws HyracksDataException {
ILSMIndex targetIndex = (ILSMIndex) index;
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put(LSMIOOperationCallback.KEY_FLUSHED_COMPONENT_ID, LSMComponentId.DEFAULT_COMPONENT_ID);
if (usage.equals(BulkLoadUsage.LOAD)) {
- // for a loaded dataset, we use the default Id 0 which is guaranteed to be smaller
- // than Ids of all memory components
-
- // TODO handle component Id for datasets loaded multiple times
- // TODO move this piece of code to io operation callback
- bulkLoader = targetIndex.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
- ILSMDiskComponent diskComponent = ((LSMIndexDiskComponentBulkLoader) bulkLoader).getComponent();
- LSMComponentIdUtils.persist(LSMComponentId.DEFAULT_COMPONENT_ID, diskComponent.getMetadata());
+ bulkLoader = targetIndex.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
+ parameters);
} else {
primaryIndexHelper.open();
primaryIndex = (ILSMIndex) primaryIndexHelper.getIndexInstance();
List<ILSMDiskComponent> primaryComponents = primaryIndex.getDiskComponents();
- bulkLoader = targetIndex.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex);
if (!primaryComponents.isEmpty()) {
- // TODO move this piece of code to io operation callback
- // Ideally, this should be done in io operation callback when a bulk load operation is finished
- // However, currently we don't have an extensible callback mechanism to support this
ILSMComponentId bulkloadId = LSMComponentIdUtils.union(primaryComponents.get(0).getId(),
primaryComponents.get(primaryComponents.size() - 1).getId());
- ILSMDiskComponent diskComponent = ((LSMIndexDiskComponentBulkLoader) bulkLoader).getComponent();
- LSMComponentIdUtils.persist(bulkloadId, diskComponent.getMetadata());
+ parameters.put(LSMIOOperationCallback.KEY_FLUSHED_COMPONENT_ID, bulkloadId);
+ } else {
+ parameters.put(LSMIOOperationCallback.KEY_FLUSHED_COMPONENT_ID,
+ LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID);
}
+ bulkLoader = targetIndex.createBulkLoader(fillFactor, verifyInput, numElementsHint, checkIfEmptyIndex,
+ parameters);
+
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
index 6d9ec47..1029d6f 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
@@ -19,7 +19,10 @@
package org.apache.asterix.runtime.operators;
import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.asterix.common.ioopcallbacks.LSMIOOperationCallback;
import org.apache.asterix.runtime.operators.LSMSecondaryIndexCreationTupleProcessorNodePushable.DeletedTupleCounter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -32,9 +35,9 @@ import org.apache.hyracks.storage.am.common.tuples.PermutingTupleReference;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMIndexDiskComponentBulkLoader;
-import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
/**
+ * Note: only used with correlated merge policy
* This operator node is used to bulk load incoming tuples (scanned from the primary index)
* into multiple disk components of the secondary index.
* Incoming tuple format:
@@ -182,14 +185,12 @@ public class LSMSecondaryIndexBulkLoadNodePushable extends AbstractLSMSecondaryI
private void loadNewComponent(int componentPos) throws HyracksDataException {
endCurrentComponent();
-
int numTuples = getNumDeletedTuples(componentPos);
ILSMDiskComponent primaryComponent = primaryIndex.getDiskComponents().get(componentPos);
- componentBulkLoader =
- (LSMIndexDiskComponentBulkLoader) secondaryIndex.createBulkLoader(1.0f, false, numTuples, false);
- ILSMDiskComponent diskComponent = componentBulkLoader.getComponent();
- // TODO move this piece of code to io operation callback
- LSMComponentIdUtils.persist(primaryComponent.getId(), diskComponent.getMetadata());
+ Map<String, Object> parameters = new HashMap<>();
+ parameters.put(LSMIOOperationCallback.KEY_FLUSHED_COMPONENT_ID, primaryComponent.getId());
+ componentBulkLoader = (LSMIndexDiskComponentBulkLoader) secondaryIndex.createBulkLoader(1.0f, false, numTuples,
+ false, parameters);
}
private void addAntiMatterTuple(ITupleReference tuple) throws HyracksDataException {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
index d61e9a0..7e42d14 100644
--- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
+++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/std/FlushDatasetOperatorDescriptor.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.context.DatasetInfo;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.transactions.DatasetId;
import org.apache.asterix.common.transactions.ILockManager;
@@ -82,7 +83,13 @@ public class FlushDatasetOperatorDescriptor extends AbstractSingleActivityOperat
// lock the dataset granule
lockManager.lock(datasetId, -1, LockMode.S, txnCtx);
// flush the dataset synchronously
- datasetLifeCycleManager.flushDataset(datasetId.getId(), false);
+ DatasetInfo datasetInfo = datasetLifeCycleManager.getDatasetInfo(datasetId.getId());
+ // TODO: Remove the isOpen check and let it fail if flush is requested for a dataset that is closed
+ synchronized (datasetLifeCycleManager) {
+ if (datasetInfo.isOpen()) {
+ datasetLifeCycleManager.flushDataset(datasetId.getId(), false);
+ }
+ }
} catch (ACIDException e) {
throw HyracksDataException.create(e);
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 0375c30..e1963cb 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -186,7 +186,7 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
}
resourceCache.put(resource.getPath(), resource);
- indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(0);
+ indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(null, 0);
//if replication enabled, send resource metadata info to remote nodes
if (isReplicationEnabled) {
createReplicationJob(ReplicationOperation.REPLICATE, resourceFile);
@@ -429,15 +429,20 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
}
private void deleteIndexInvalidComponents(File index) throws IOException, ParseException {
- final Optional<String> validComponentTimestamp = getIndexCheckpointManager(index).getValidComponentTimestamp();
- if (!validComponentTimestamp.isPresent()) {
- // index doesn't have any components
- return;
- }
final Format formatter = THREAD_LOCAL_FORMATTER.get();
- final Date validTimestamp = (Date) formatter.parseObject(validComponentTimestamp.get());
final File[] indexComponentFiles = index.listFiles(COMPONENT_FILES_FILTER);
- if (indexComponentFiles != null) {
+ if (indexComponentFiles == null) {
+ throw new IOException(index + " doesn't exist or an IO error occurred");
+ }
+ final Optional<String> validComponentTimestamp = getIndexCheckpointManager(index).getValidComponentTimestamp();
+ if (!validComponentTimestamp.isPresent()) {
+ // index doesn't have any valid component, delete all
+ for (File componentFile : indexComponentFiles) {
+ LOGGER.info(() -> "Deleting invalid component file: " + componentFile.getAbsolutePath());
+ Files.delete(componentFile.toPath());
+ }
+ } else {
+ final Date validTimestamp = (Date) formatter.parseObject(validComponentTimestamp.get());
for (File componentFile : indexComponentFiles) {
// delete any file with startTime > validTimestamp
final String fileStartTimeStr =
@@ -505,7 +510,8 @@ public class PersistentLocalResourceRepository implements ILocalResourceReposito
* e.g. a component file 2018-01-08-01-08-50-439_2018-01-08-01-08-50-439_b
* will return a component id 2018-01-08-01-08-50-439_2018-01-08-01-08-50-439
*
- * @param componentFile any component file
+ * @param componentFile
+ * any component file
* @return The component id
*/
public static String getComponentId(String componentFile) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index 21268e5..4085fb4 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -89,8 +89,7 @@ public class LogBuffer implements ILogBuffer {
public void append(ILogRecord logRecord, long appendLsn) {
logRecord.writeLogRecord(appendBuffer);
- if (logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.FLUSH
- && logRecord.getLogType() != LogType.WAIT) {
+ if (isLocalTransactionLog(logRecord)) {
logRecord.getTxnCtx().setLastLSN(appendLsn);
}
@@ -100,13 +99,10 @@ public class LogBuffer implements ILogBuffer {
LOGGER.info("append()| appendOffset: " + appendOffset);
}
if (logRecord.getLogSource() == LogSource.LOCAL) {
- if (logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT
- || logRecord.getLogType() == LogType.WAIT) {
+ if (syncPendingNonFlushLog(logRecord)) {
logRecord.isFlushed(false);
syncCommitQ.add(logRecord);
- }
- if (logRecord.getLogType() == LogType.FLUSH) {
- logRecord.isFlushed(false);
+ } else if (logRecord.getLogType() == LogType.FLUSH) {
flushQ.add(logRecord);
}
} else if (logRecord.getLogSource() == LogSource.REMOTE && (logRecord.getLogType() == LogType.JOB_COMMIT
@@ -117,6 +113,16 @@ public class LogBuffer implements ILogBuffer {
}
}
+ private boolean syncPendingNonFlushLog(ILogRecord logRecord) {
+ return logRecord.getLogType() == LogType.JOB_COMMIT || logRecord.getLogType() == LogType.ABORT
+ || logRecord.getLogType() == LogType.WAIT || logRecord.getLogType() == LogType.WAIT_FOR_FLUSHES;
+ }
+
+ private boolean isLocalTransactionLog(ILogRecord logRecord) {
+ return logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.FLUSH
+ && logRecord.getLogType() != LogType.WAIT && logRecord.getLogType() != LogType.WAIT_FOR_FLUSHES;
+ }
+
@Override
public void setFileChannel(FileChannel fileChannel) {
this.fileChannel = fileChannel;
@@ -231,7 +237,8 @@ public class LogBuffer implements ILogBuffer {
notifyJobTermination();
} else if (logRecord.getLogType() == LogType.FLUSH) {
notifyFlushTermination();
- } else if (logRecord.getLogType() == LogType.WAIT) {
+ } else if (logRecord.getLogType() == LogType.WAIT
+ || logRecord.getLogType() == LogType.WAIT_FOR_FLUSHES) {
notifyWaitTermination();
}
} else if (logRecord.getLogSource() == LogSource.REMOTE && (logRecord.getLogType() == LogType.JOB_COMMIT
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
index 736de07..be227ec 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java
@@ -32,7 +32,6 @@ import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
-import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -134,13 +133,33 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
@Override
public void log(ILogRecord logRecord) {
- if (logRecord.getLogType() == LogType.FLUSH) {
+ if (!logToFlushQueue(logRecord)) {
+ appendToLogTail(logRecord);
+ }
+ }
+
+ @SuppressWarnings("squid:S2445")
+ protected boolean logToFlushQueue(ILogRecord logRecord) {
+ //Remote flush logs do not need to be flushed separately since they may not trigger local flush
+ if ((logRecord.getLogType() == LogType.FLUSH && logRecord.getLogSource() == LogSource.LOCAL)
+ || logRecord.getLogType() == LogType.WAIT_FOR_FLUSHES) {
+ logRecord.isFlushed(false);
flushLogsQ.add(logRecord);
- return;
+ if (logRecord.getLogType() == LogType.WAIT_FOR_FLUSHES) {
+ InvokeUtil.doUninterruptibly(() -> {
+ synchronized (logRecord) {
+ while (!logRecord.isFlushed()) {
+ logRecord.wait();
+ }
+ }
+ });
+ }
+ return true;
}
- appendToLogTail(logRecord);
+ return false;
}
+ @SuppressWarnings("squid:S2445")
protected void appendToLogTail(ILogRecord logRecord) {
syncAppendToLogTail(logRecord);
if (waitForFlush(logRecord) && !logRecord.isFlushed()) {
@@ -161,7 +180,7 @@ public class LogManager implements ILogManager, ILifeCycleComponent {
synchronized void syncAppendToLogTail(ILogRecord logRecord) {
if (logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.FLUSH
- && logRecord.getLogType() != LogType.WAIT) {
+ && logRecord.getLogType() != LogType.WAIT && logRecord.getLogType() != LogType.WAIT_FOR_FLUSHES) {
ITransactionContext txnCtx = logRecord.getTxnCtx();
if (txnCtx.getTxnState() == ITransactionManager.ABORTED && logRecord.getLogType() != LogType.ABORT) {
throw new ACIDException(
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index 1e13883..d2e9629 100644
--- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -40,9 +40,11 @@ public class LogManagerWithReplication extends LogManager {
super(txnSubsystem);
}
+ @SuppressWarnings("squid:S2445")
@Override
public void log(ILogRecord logRecord) {
- boolean shouldReplicate = logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.WAIT;
+ boolean shouldReplicate = logRecord.getLogSource() == LogSource.LOCAL && logRecord.getLogType() != LogType.WAIT
+ && logRecord.getLogType() != LogType.WAIT_FOR_FLUSHES;
if (shouldReplicate) {
switch (logRecord.getLogType()) {
case LogType.ENTITY_COMMIT:
@@ -63,16 +65,12 @@ public class LogManagerWithReplication extends LogManager {
}
}
logRecord.setReplicate(shouldReplicate);
-
- //Remote flush logs do not need to be flushed separately since they may not trigger local flush
- if (logRecord.getLogType() == LogType.FLUSH && logRecord.getLogSource() == LogSource.LOCAL) {
- flushLogsQ.add(logRecord);
- return;
+ if (!logToFlushQueue(logRecord)) {
+ appendToLogTail(logRecord);
}
-
- appendToLogTail(logRecord);
}
+ @SuppressWarnings("squid:S2445")
@Override
protected void appendToLogTail(ILogRecord logRecord) {
syncAppendToLogTail(logRecord);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/10a3f21d/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITwoPCIndexBulkLoader.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITwoPCIndexBulkLoader.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITwoPCIndexBulkLoader.java
index 16c0afa..c7be7eb 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITwoPCIndexBulkLoader.java
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/api/ITwoPCIndexBulkLoader.java
@@ -39,6 +39,6 @@ public interface ITwoPCIndexBulkLoader {
/**
* Abort the bulk modify op
*/
- public void abort();
+ public void abort() throws HyracksDataException;
}