You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/07/05 11:23:59 UTC
[17/33] ignite git commit: IGNITE-5558 - Added ability to read WAL in
standalone mode - Fixes #2174.
http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
new file mode 100644
index 0000000..df932e6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.reader;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridComponent;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridKernalGateway;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.MarshallerContextImpl;
+import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManager;
+import org.apache.ignite.internal.managers.collision.GridCollisionManager;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.deployment.GridDeploymentManager;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
+import org.apache.ignite.internal.managers.failover.GridFailoverManager;
+import org.apache.ignite.internal.managers.indexing.GridIndexingManager;
+import org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager;
+import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor;
+import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
+import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
+import org.apache.ignite.internal.processors.closure.GridClosureProcessor;
+import org.apache.ignite.internal.processors.cluster.ClusterProcessor;
+import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
+import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
+import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor;
+import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
+import org.apache.ignite.internal.processors.hadoop.HadoopHelper;
+import org.apache.ignite.internal.processors.hadoop.HadoopProcessorAdapter;
+import org.apache.ignite.internal.processors.igfs.IgfsHelper;
+import org.apache.ignite.internal.processors.igfs.IgfsProcessorAdapter;
+import org.apache.ignite.internal.processors.job.GridJobProcessor;
+import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor;
+import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor;
+import org.apache.ignite.internal.processors.odbc.SqlListenerProcessor;
+import org.apache.ignite.internal.processors.platform.PlatformProcessor;
+import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor;
+import org.apache.ignite.internal.processors.pool.PoolProcessor;
+import org.apache.ignite.internal.processors.port.GridPortProcessor;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.processors.resource.GridResourceProcessor;
+import org.apache.ignite.internal.processors.rest.GridRestProcessor;
+import org.apache.ignite.internal.processors.schedule.IgniteScheduleProcessorAdapter;
+import org.apache.ignite.internal.processors.security.GridSecurityProcessor;
+import org.apache.ignite.internal.processors.segmentation.GridSegmentationProcessor;
+import org.apache.ignite.internal.processors.service.GridServiceProcessor;
+import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor;
+import org.apache.ignite.internal.processors.task.GridTaskProcessor;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.suggestions.GridPerformanceSuggestions;
+import org.apache.ignite.internal.util.IgniteExceptionRegistry;
+import org.apache.ignite.internal.util.StripedExecutor;
+import org.apache.ignite.plugin.PluginNotFoundException;
+import org.apache.ignite.plugin.PluginProvider;
+import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Dummy grid kernal context
+ */
+public class StandaloneGridKernalContext implements GridKernalContext {
+ private IgniteLogger log;
+
+ /**
+ * @param log Logger.
+ */
+ StandaloneGridKernalContext(IgniteLogger log) {
+ this.log = log;
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<GridComponent> components() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public UUID localNodeId() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String igniteInstanceName() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteLogger log(String ctgr) {
+ return log;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteLogger log(Class<?> cls) {
+ return log;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isStopping() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridKernalGateway gateway() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteEx grid() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteConfiguration config() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridTaskProcessor task() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridAffinityProcessor affinity() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridJobProcessor job() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridTimeoutProcessor timeout() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridResourceProcessor resource() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridJobMetricsProcessor jobMetric() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCacheProcessor cache() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridClusterStateProcessor state() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridTaskSessionProcessor session() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridClosureProcessor closure() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridServiceProcessor service() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridPortProcessor ports() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteScheduleProcessorAdapter schedule() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridRestProcessor rest() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridSegmentationProcessor segmentation() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <K, V> DataStreamProcessor<K, V> dataStream() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsProcessorAdapter igfs() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsHelper igfsHelper() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridContinuousProcessor continuous() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopProcessorAdapter hadoop() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public PoolProcessor pools() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridMarshallerMappingProcessor mapping() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public HadoopHelper hadoopHelper() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ExecutorService utilityCachePool() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteStripedThreadPoolExecutor asyncCallbackPool() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCacheObjectProcessor cacheObjects() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridQueryProcessor query() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public SqlListenerProcessor sqlListener() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgnitePluginProcessor plugins() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridDeploymentManager deploy() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridIoManager io() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridDiscoveryManager discovery() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCheckpointManager checkpoint() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridEventStorageManager event() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridFailoverManager failover() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridCollisionManager collision() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridSecurityProcessor security() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridLoadBalancerManager loadBalancing() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridIndexingManager indexing() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public DataStructuresProcessor dataStructures() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void markSegmented() { }
+
+ /** {@inheritDoc} */
+ @Override public boolean segmented() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void printMemoryStats() { }
+
+ /** {@inheritDoc} */
+ @Override public boolean isDaemon() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridPerformanceSuggestions performance() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String userVersion(ClassLoader ldr) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public PluginProvider pluginProvider(String name) throws PluginNotFoundException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T createComponent(Class<T> cls) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ExecutorService getExecutorService() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ExecutorService getServiceExecutorService() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ExecutorService getSystemExecutorService() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public StripedExecutor getStripedExecutorService() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ExecutorService getManagementExecutorService() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ExecutorService getPeerClassLoadingExecutorService() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ExecutorService getIgfsExecutorService() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ExecutorService getDataStreamerExecutorService() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ExecutorService getRestExecutorService() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ExecutorService getAffinityExecutorService() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public ExecutorService getIndexingExecutorService() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ExecutorService getQueryExecutorService() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Map<String, ? extends ExecutorService> customExecutors() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ExecutorService getSchemaExecutorService() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteExceptionRegistry exceptionRegistry() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object nodeAttribute(String key) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNodeAttribute(String key) {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object addNodeAttribute(String key, Object val) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<String, Object> nodeAttributes() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClusterProcessor cluster() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public MarshallerContextImpl marshallerContext() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean clientNode() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean clientDisconnected() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public PlatformProcessor platform() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @NotNull @Override public Iterator<GridComponent> iterator() {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneIgniteCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneIgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneIgniteCacheDatabaseSharedManager.java
new file mode 100644
index 0000000..85a8724
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneIgniteCacheDatabaseSharedManager.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.reader;
+
+import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+
+/**
+ * Fake implementation for publishing setter and for creation in standalone WAL reader tool
+ */
+class StandaloneIgniteCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedManager {
+ /** {@inheritDoc} */
+ @Override public void setPageSize(int pageSize) {
+ super.setPageSize(pageSize);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
new file mode 100644
index 0000000..f17c112
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.reader;
+
+import java.io.DataInput;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.wal.AbstractWalRecordsIterator;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileInput;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentEofException;
+import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer;
+import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE;
+
+/**
+ * WAL reader iterator, for creation in standalone WAL reader tool
+ * Operates over one directory, does not provide start and end boundaries
+ */
+class StandaloneWalRecordsIterator extends AbstractWalRecordsIterator {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Record buffer size */
+ private static final int BUF_SIZE = 2 * 1024 * 1024;
+
+ /**
+ * WAL files directory. Should already contain 'consistent ID' as subfolder.
+ * <code>null</code> value means file-by-file iteration mode
+ */
+ @Nullable
+ private File walFilesDir;
+
+ /**
+ * File descriptors remained to scan.
+ * <code>null</code> value means directory scan mode
+ */
+ @Nullable
+ private List<FileWriteAheadLogManager.FileDescriptor> walFileDescriptors;
+
+ /**
+ * True if this iterator used for work dir, false for archive.
+ * In work dir mode exceptions come from record reading are ignored (file may be not completed).
+ * Index of file is taken from file itself, not from file name
+ */
+ private boolean workDir;
+
+ /**
+ * Creates iterator in directory scan mode
+ *
+ * @param walFilesDir Wal files directory. Should already contain node consistent ID as subfolder
+ * @param log Logger.
+ * @param sharedCtx Shared context.
+ */
+ StandaloneWalRecordsIterator(
+ @NotNull final File walFilesDir,
+ @NotNull final IgniteLogger log,
+ @NotNull final GridCacheSharedContext sharedCtx) throws IgniteCheckedException {
+ super(log,
+ sharedCtx,
+ new RecordV1Serializer(sharedCtx),
+ BUF_SIZE);
+ init(walFilesDir, false, null);
+ advance();
+ }
+
+ /**
+ * Creates iterator in file-by-file iteration mode. Directory
+ *
+ * @param log Logger.
+ * @param sharedCtx Shared context.
+ * @param workDir Work directory is scanned, false - archive
+ * @param walFiles Wal files.
+ */
+ StandaloneWalRecordsIterator(
+ @NotNull final IgniteLogger log,
+ @NotNull final GridCacheSharedContext sharedCtx,
+ final boolean workDir,
+ @NotNull final File... walFiles) throws IgniteCheckedException {
+ super(log,
+ sharedCtx,
+ new RecordV1Serializer(sharedCtx),
+ BUF_SIZE);
+ this.workDir = workDir;
+ init(null, workDir, walFiles);
+ advance();
+ }
+
+ /**
+ * For directory mode sets oldest file as initial segment,
+ * for file by file mode, converts all files to descriptors and gets oldest as initial.
+ *
+ * @param walFilesDir directory for directory scan mode
+ * @param workDir work directory, only for file-by-file mode
+ * @param walFiles files for file-by-file iteration mode
+ */
+ private void init(
+ @Nullable final File walFilesDir,
+ final boolean workDir,
+ @Nullable final File[] walFiles) throws IgniteCheckedException {
+ if (walFilesDir != null) {
+ FileWriteAheadLogManager.FileDescriptor[] descs = loadFileDescriptors(walFilesDir);
+ curWalSegmIdx = !F.isEmpty(descs) ? descs[0].getIdx() : 0;
+ this.walFilesDir = walFilesDir;
+ this.workDir = false;
+ }
+ else {
+ this.workDir = workDir;
+ if (workDir)
+ walFileDescriptors = scanIndexesFromFileHeaders(walFiles);
+ else
+ walFileDescriptors = new ArrayList<>(Arrays.asList(FileWriteAheadLogManager.scan(walFiles)));
+ curWalSegmIdx = !walFileDescriptors.isEmpty() ? walFileDescriptors.get(0).getIdx() : 0;
+ }
+ curWalSegmIdx--;
+
+ if (log.isDebugEnabled())
+ log.debug("Initialized WAL cursor [curWalSegmIdx=" + curWalSegmIdx + ']');
+ }
+
+ /**
+ * This methods checks all provided files to be correct WAL segment.
+ * Header record and its position is checked. WAL position is used to deremine real index.
+ * File index from file name is ignored.
+ *
+ * @param allFiles files to scan
+ * @return list of file descriptors with checked header records, file index is set
+ * @throws IgniteCheckedException if IO error occurs
+ */
+ private List<FileWriteAheadLogManager.FileDescriptor> scanIndexesFromFileHeaders(
+ @Nullable final File[] allFiles) throws IgniteCheckedException {
+ if (allFiles == null || allFiles.length == 0)
+ return Collections.emptyList();
+
+ final List<FileWriteAheadLogManager.FileDescriptor> resultingDescs = new ArrayList<>();
+
+ for (File file : allFiles) {
+ if (file.length() < HEADER_RECORD_SIZE)
+ continue;
+
+ FileWALPointer ptr;
+
+ try (RandomAccessFile rf = new RandomAccessFile(file, "r");) {
+ final FileChannel ch = rf.getChannel();
+ final ByteBuffer buf = ByteBuffer.allocate(HEADER_RECORD_SIZE);
+
+ buf.order(ByteOrder.nativeOrder());
+
+ final DataInput in = new FileInput(ch, buf);
+ // Header record must be agnostic to the serializer version.
+ final int type = in.readUnsignedByte();
+
+ if (type == WALRecord.RecordType.STOP_ITERATION_RECORD_TYPE)
+ throw new SegmentEofException("Reached logical end of the segment", null);
+ ptr = RecordV1Serializer.readPosition(in);
+ }
+ catch (IOException e) {
+ throw new IgniteCheckedException("Failed to scan index from file [" + file + "]", e);
+ }
+
+ resultingDescs.add(new FileWriteAheadLogManager.FileDescriptor(file, ptr.index()));
+ }
+ Collections.sort(resultingDescs);
+ return resultingDescs;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected FileWriteAheadLogManager.ReadFileHandle advanceSegment(
+ @Nullable final FileWriteAheadLogManager.ReadFileHandle curWalSegment) throws IgniteCheckedException {
+
+ if (curWalSegment != null)
+ curWalSegment.close();
+
+ curWalSegmIdx++;
+ // curHandle.workDir is false
+ final FileWriteAheadLogManager.FileDescriptor fd;
+
+ if (walFilesDir != null) {
+ fd = new FileWriteAheadLogManager.FileDescriptor(
+ new File(walFilesDir,
+ FileWriteAheadLogManager.FileDescriptor.fileName(curWalSegmIdx)));
+ }
+ else {
+ if (walFileDescriptors.isEmpty())
+ return null; //no files to read, stop iteration
+
+ fd = walFileDescriptors.remove(0);
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Reading next file [absIdx=" + curWalSegmIdx + ", file=" + fd.getAbsolutePath() + ']');
+
+ assert fd != null;
+
+ curRec = null;
+ try {
+ return initReadHandle(fd, null);
+ }
+ catch (FileNotFoundException e) {
+ log.info("Missing WAL segment in the archive: " + e.getMessage());
+ return null;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void handleRecordException(
+ @NotNull final Exception e,
+ @Nullable final FileWALPointer ptr) {
+ super.handleRecordException(e, ptr);
+ final RuntimeException ex = new RuntimeException("Record reading problem occurred at file pointer [" + ptr + "]:" + e.getMessage(), e);
+
+ ex.printStackTrace();
+ if (!workDir)
+ throw ex;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void onClose() throws IgniteCheckedException {
+ super.onClose();
+ curRec = null;
+
+ closeCurrentWalSegment();
+
+ curWalSegmIdx = Integer.MAX_VALUE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
index 0ccd3a0..0a7b3dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/serializer/RecordV1Serializer.java
@@ -103,21 +103,37 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC;
/**
* Record V1 serializer.
+ * Stores records in following format:
+ * <ul>
+ * <li>Record type from {@link RecordType#ordinal()} incremented by 1</li>
+ * <li>WAL pointer to double check consistency</li>
+ * <li>Data</li>
+ * <li>CRC or zero padding</li>
+ * </ul>
*/
public class RecordV1Serializer implements RecordSerializer {
- /** */
- public static final int HEADER_RECORD_SIZE = /*Type*/1 + /*Pointer */12 + /*Magic*/8 + /*Version*/4 + /*CRC*/4;
+ /** Length of Type */
+ public static final int REC_TYPE_SIZE = 1;
+
+ /** Length of WAL Pointer */
+ public static final int FILE_WAL_POINTER_SIZE = 12;
+
+ /** Length of CRC value */
+ private static final int CRC_SIZE = 4;
/** */
+ public static final int HEADER_RECORD_SIZE = REC_TYPE_SIZE + FILE_WAL_POINTER_SIZE + /*Magic*/8 + /*Version*/4 + CRC_SIZE;
+
+ /** Cache shared context */
private GridCacheSharedContext cctx;
- /** */
+ /** Size of page used for PageMemory regions */
private int pageSize;
- /** */
+ /** Cache object processor to reading {@link DataEntry DataEntries} */
private IgniteCacheObjectProcessor co;
- /** */
+ /** Skip CRC calculation/check flag */
private boolean skipCrc = IgniteSystemProperties.getBoolean(IGNITE_PDS_SKIP_CRC, false);
/**
@@ -658,7 +674,7 @@ public class RecordV1Serializer implements RecordSerializer {
assert res != null;
- res.size((int)(in0.position() - startPos + 4)); // Account for CRC which will be read afterwards.
+ res.size((int)(in0.position() - startPos + CRC_SIZE)); // Account for CRC which will be read afterwards.
return res;
}
@@ -671,12 +687,16 @@ public class RecordV1Serializer implements RecordSerializer {
}
/**
- * @param in In.
+ * Loads record from input, does not read CRC value
+ *
+ * @param in Input to read record from
+ * @param expPtr expected WAL pointer for record. Used to validate actual position against expected from the file
+ * @throws SegmentEofException if end of WAL segment reached
*/
private WALRecord readRecord(ByteBufferBackedDataInput in, WALPointer expPtr) throws IOException, IgniteCheckedException {
int type = in.readUnsignedByte();
- if (type == 0)
+ if (type == WALRecord.RecordType.STOP_ITERATION_RECORD_TYPE)
throw new SegmentEofException("Reached logical end of the segment", null);
FileWALPointer ptr = readPosition(in);
@@ -1212,7 +1232,7 @@ public class RecordV1Serializer implements RecordSerializer {
/** {@inheritDoc} */
@SuppressWarnings("CastConflictsWithInstanceof")
@Override public int size(WALRecord record) throws IgniteCheckedException {
- int commonFields = /* Type */1 + /* Pointer */12 + /*CRC*/4;
+ int commonFields = REC_TYPE_SIZE + FILE_WAL_POINTER_SIZE + CRC_SIZE;
switch (record.type()) {
case PAGE_RECORD:
@@ -1371,7 +1391,7 @@ public class RecordV1Serializer implements RecordSerializer {
return commonFields + /*cacheId*/ 4 + /*pageId*/ 8;
case SWITCH_SEGMENT_RECORD:
- return commonFields;
+ return commonFields - CRC_SIZE; //CRC is not loaded for switch segment, exception is thrown instead
default:
throw new UnsupportedOperationException("Type: " + record.type());
@@ -1379,10 +1399,11 @@ public class RecordV1Serializer implements RecordSerializer {
}
/**
+ * Saves position, WAL pointer (requires {@link #FILE_WAL_POINTER_SIZE} bytes)
* @param buf Byte buffer to serialize version to.
* @param ptr File WAL pointer to write.
*/
- private void putPosition(ByteBuffer buf, FileWALPointer ptr) {
+ public static void putPosition(ByteBuffer buf, FileWALPointer ptr) {
buf.putLong(ptr.index());
buf.putInt(ptr.fileOffset());
}
@@ -1392,7 +1413,7 @@ public class RecordV1Serializer implements RecordSerializer {
* @return Read file WAL pointer.
* @throws IOException If failed to write.
*/
- private FileWALPointer readPosition(DataInput in) throws IOException {
+ public static FileWALPointer readPosition(DataInput in) throws IOException {
long idx = in.readLong();
int fileOffset = in.readInt();
http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreDataStructuresTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreDataStructuresTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreDataStructuresTest.java
index 5561d95..fed8766 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreDataStructuresTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreDataStructuresTest.java
@@ -181,6 +181,8 @@ public class IgnitePersistentStoreDataStructuresTest extends GridCommonAbstractT
for (int i = 0; i < 100; i++)
set.add(i);
+ assertEquals(100, set.size());
+
stopAllGrids();
ignite = startGrids(4);
http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java
index 793806e..48d8c21 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalHistoryReservationsTest.java
@@ -297,7 +297,7 @@ public class IgniteWalHistoryReservationsTest extends GridCommonAbstractTest {
final int entryCnt = 10_000;
final int initGridCnt = 4;
- final IgniteEx ig0 = (IgniteEx)startGrids(initGridCnt);
+ final Ignite ig0 = startGrids(initGridCnt);
ig0.active(true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
new file mode 100644
index 0000000..06bcf08
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db.wal.reader;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteEvents;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.configuration.BinaryConfiguration;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.MemoryConfiguration;
+import org.apache.ignite.configuration.MemoryPolicyConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.apache.ignite.events.WalSegmentArchivedEvent;
+import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_ARCHIVED;
+
+/**
+ * Test suite for WAL segments reader and event generator.
+ */
+public class IgniteWalReaderTest extends GridCommonAbstractTest {
+ /** Wal segments count */
+ private static final int WAL_SEGMENTS = 10;
+
+ /** Cache name. */
+ private static final String CACHE_NAME = "cache0";
+
+ /** Fill wal with some data before iterating. Should be true for non local run */
+ private static final boolean fillWalBeforeTest = true;
+
+ /** Delete DB dir before test. */
+ private static final boolean deleteBefore = true;
+
+ /** Delete DB dir after test. */
+ private static final boolean deleteAfter = true;
+
+ /** Dump records to logger. Should be false for non local run */
+ private static final boolean dumpRecords = false;
+
+ /** Page size to set */
+ public static final int PAGE_SIZE = 4 * 1024;
+
+ /**
+ * Field for transferring setting from test to getConfig method
+ * Archive incomplete segment after inactivity milliseconds.
+ */
+ private int archiveIncompleteSegmentAfterInactivityMs = 0;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ final IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ final CacheConfiguration<Integer, IgniteWalReaderTest.IndexedObject> ccfg = new CacheConfiguration<>(CACHE_NAME);
+
+ ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
+ ccfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+ ccfg.setAffinity(new RendezvousAffinityFunction(false, 32));
+ ccfg.setIndexedTypes(Integer.class, IgniteWalReaderTest.IndexedObject.class);
+
+ cfg.setCacheConfiguration(ccfg);
+
+ cfg.setIncludeEventTypes(EventType.EVT_WAL_SEGMENT_ARCHIVED);
+
+ final MemoryConfiguration dbCfg = new MemoryConfiguration();
+
+ dbCfg.setPageSize(PAGE_SIZE);
+
+ final MemoryPolicyConfiguration memPlcCfg = new MemoryPolicyConfiguration();
+
+ memPlcCfg.setName("dfltMemPlc");
+ memPlcCfg.setInitialSize(1024 * 1024 * 1024);
+ memPlcCfg.setMaxSize(1024 * 1024 * 1024);
+
+ dbCfg.setMemoryPolicies(memPlcCfg);
+ dbCfg.setDefaultMemoryPolicyName("dfltMemPlc");
+
+ cfg.setMemoryConfiguration(dbCfg);
+
+ final PersistentStoreConfiguration pCfg = new PersistentStoreConfiguration();
+ pCfg.setWalHistorySize(1);
+ pCfg.setWalSegmentSize(1024 * 1024);
+ pCfg.setWalSegments(WAL_SEGMENTS);
+ pCfg.setWalMode(WALMode.BACKGROUND);
+
+ if (archiveIncompleteSegmentAfterInactivityMs > 0)
+ pCfg.setWalAutoArchiveAfterInactivity(archiveIncompleteSegmentAfterInactivityMs);
+
+ cfg.setPersistentStoreConfiguration(pCfg);
+
+ final BinaryConfiguration binCfg = new BinaryConfiguration();
+
+ binCfg.setCompactFooter(false);
+
+ cfg.setBinaryConfiguration(binCfg);
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ stopAllGrids();
+
+ if (deleteBefore)
+ deleteWorkFiles();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ if (deleteAfter)
+ deleteWorkFiles();
+ }
+
+ /**
+ * @throws IgniteCheckedException If failed.
+ */
+ private void deleteWorkFiles() throws IgniteCheckedException {
+ if (fillWalBeforeTest)
+ deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false));
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testFillWalAndReadRecords() throws Exception {
+ final int cacheObjectsToWrite = 10000;
+
+ if (fillWalBeforeTest) {
+ final Ignite ignite0 = startGrid("node0");
+
+ ignite0.active(true);
+
+ putDummyRecords(ignite0, cacheObjectsToWrite);
+
+ stopGrid("node0");
+ }
+
+ final File db = U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false);
+ final File wal = new File(db, "wal");
+ final File walArchive = new File(wal, "archive");
+ final String consistentId = "127_0_0_1_47500";
+ final MockWalIteratorFactory mockItFactory = new MockWalIteratorFactory(log, PAGE_SIZE, consistentId, WAL_SEGMENTS);
+ final WALIterator it = mockItFactory.iterator(wal, walArchive);
+ final int cntUsingMockIter = iterateAndCount(it);
+
+ log.info("Total records loaded " + cntUsingMockIter);
+ assert cntUsingMockIter > 0;
+ assert cntUsingMockIter > cacheObjectsToWrite;
+
+ final File walArchiveDirWithConsistentId = new File(walArchive, consistentId);
+ final File walWorkDirWithConsistentId = new File(wal, consistentId);
+
+ final IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(log, PAGE_SIZE);
+ final int cntArchiveDir = iterateAndCount(factory.iteratorArchiveDirectory(walArchiveDirWithConsistentId));
+
+ log.info("Total records loaded using directory : " + cntArchiveDir);
+
+ final int cntArchiveFileByFile = iterateAndCount(
+ factory.iteratorArchiveFiles(
+ walArchiveDirWithConsistentId.listFiles(FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER)));
+
+ log.info("Total records loaded using archive directory (file-by-file): " + cntArchiveFileByFile);
+
+ assert cntArchiveFileByFile > cacheObjectsToWrite;
+ assert cntArchiveDir > cacheObjectsToWrite;
+ assert cntArchiveDir == cntArchiveFileByFile;
+ //really count2 may be less because work dir correct loading is not supported yet
+ assert cntUsingMockIter >= cntArchiveDir
+ : "Mock based reader loaded " + cntUsingMockIter + " records but standalone has loaded only " + cntArchiveDir;
+
+
+ final File[] workFiles = walWorkDirWithConsistentId.listFiles(FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER);
+ int cntWork = 0;
+
+ try (WALIterator stIt = factory.iteratorWorkFiles(workFiles)) {
+ while (stIt.hasNextX()) {
+ IgniteBiTuple<WALPointer, WALRecord> next = stIt.nextX();
+ if (dumpRecords)
+ log.info("Work. Record: " + next.get2());
+ cntWork++;
+ }
+ }
+ log.info("Total records loaded from work: " + cntWork);
+
+ assert cntWork + cntArchiveFileByFile == cntUsingMockIter
+ : "Work iterator loaded [" + cntWork + "] " +
+ "Archive iterator loaded [" + cntArchiveFileByFile + "]; " +
+ "mock iterator [" + cntUsingMockIter + "]";
+
+ }
+
+ /**
+ * @param walIter iterator to count, will be closed
+ * @return count of records
+ * @throws IgniteCheckedException if failed to iterate
+ */
+ private int iterateAndCount(WALIterator walIter) throws IgniteCheckedException {
+ int cntUsingMockIter = 0;
+
+ try(WALIterator it = walIter) {
+ while (it.hasNextX()) {
+ IgniteBiTuple<WALPointer, WALRecord> next = it.nextX();
+ if (dumpRecords)
+ log.info("Record: " + next.get2());
+ cntUsingMockIter++;
+ }
+ }
+ return cntUsingMockIter;
+ }
+
+ /**
+ * Tests archive completed event is fired
+ *
+ * @throws Exception if failed
+ */
+ public void testArchiveCompletedEventFired() throws Exception {
+ final AtomicBoolean evtRecorded = new AtomicBoolean();
+
+ final Ignite ignite = startGrid("node0");
+
+ ignite.active(true);
+
+ final IgniteEvents evts = ignite.events();
+
+ if (!evts.isEnabled(EVT_WAL_SEGMENT_ARCHIVED))
+ return; //nothing to test
+
+ evts.localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event e) {
+ WalSegmentArchivedEvent archComplEvt = (WalSegmentArchivedEvent)e;
+ long idx = archComplEvt.getAbsWalSegmentIdx();
+ log.info("Finished archive for segment [" + idx + ", " +
+ archComplEvt.getArchiveFile() + "]: [" + e + "]");
+
+ evtRecorded.set(true);
+ return true;
+ }
+ }, EVT_WAL_SEGMENT_ARCHIVED);
+
+ putDummyRecords(ignite, 150);
+
+ stopGrid("node0");
+ assert evtRecorded.get();
+ }
+
+ /**
+ * Puts provided number of records to fill WAL
+ *
+ * @param ignite ignite instance
+ * @param recordsToWrite count
+ */
+ private void putDummyRecords(Ignite ignite, int recordsToWrite) {
+ IgniteCache<Object, Object> cache0 = ignite.cache(CACHE_NAME);
+
+ for (int i = 0; i < recordsToWrite; i++)
+ cache0.put(i, new IndexedObject(i));
+ }
+
+ /**
+ * Tests time out based WAL segment archiving
+ *
+ * @throws Exception if failure occurs
+ */
+ public void testArchiveIncompleteSegmentAfterInactivity() throws Exception {
+ final AtomicBoolean waitingForEvt = new AtomicBoolean();
+ final CountDownLatch archiveSegmentForInactivity = new CountDownLatch(1);
+
+ archiveIncompleteSegmentAfterInactivityMs = 1000;
+
+ final Ignite ignite = startGrid("node0");
+
+ ignite.active(true);
+
+ final IgniteEvents evts = ignite.events();
+
+ evts.localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event e) {
+ WalSegmentArchivedEvent archComplEvt = (WalSegmentArchivedEvent)e;
+ long idx = archComplEvt.getAbsWalSegmentIdx();
+ log.info("Finished archive for segment [" + idx + ", " +
+ archComplEvt.getArchiveFile() + "]: [" + e + "]");
+
+ if (waitingForEvt.get())
+ archiveSegmentForInactivity.countDown();
+ return true;
+ }
+ }, EVT_WAL_SEGMENT_ARCHIVED);
+
+ putDummyRecords(ignite, 100);
+ waitingForEvt.set(true); //flag for skipping regular log() and rollOver()
+
+ log.info("Wait for archiving segment for inactive grid started");
+
+ boolean recordedAfterSleep =
+ archiveSegmentForInactivity.await(archiveIncompleteSegmentAfterInactivityMs + 1001, TimeUnit.MILLISECONDS);
+
+ stopGrid("node0");
+ assert recordedAfterSleep;
+ }
+
+ /** Test object for placing into grid in this test */
+ private static class IndexedObject {
+ /** */
+ @QuerySqlField(index = true)
+ private int iVal;
+
+ /** Data filled with recognizable pattern */
+ private byte[] data;
+
+ /**
+ * @param iVal Integer value.
+ */
+ private IndexedObject(int iVal) {
+ this.iVal = iVal;
+ int sz = 40000;
+ data = new byte[sz];
+ for (int i = 0; i < sz; i++)
+ data[i] = (byte)('A' + (i % 10));
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ IndexedObject obj = (IndexedObject)o;
+
+ if (iVal != obj.iVal)
+ return false;
+ return Arrays.equals(data, obj.data);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ int res = iVal;
+ res = 31 * res + Arrays.hashCode(data);
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgniteWalReaderTest.IndexedObject.class, this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/MockWalIteratorFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/MockWalIteratorFactory.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/MockWalIteratorFactory.java
new file mode 100644
index 0000000..95079a0
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/MockWalIteratorFactory.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db.wal.reader;
+
+import java.io.File;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import org.jetbrains.annotations.Nullable;
+import org.mockito.Mockito;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.when;
+
+/**
+ * Mockito based WAL iterator provider
+ */
+public class MockWalIteratorFactory {
+ /** Logger. */
+ private final IgniteLogger log;
+
+ /** Page size. */
+ private final int pageSize;
+
+ /** Consistent node id. */
+ private final String consistentId;
+
+ /** Segments count in work dir. */
+ private int segments;
+
+ /**
+ * Creates factory
+ * @param log Logger.
+ * @param pageSize Page size.
+ * @param consistentId Consistent id.
+ * @param segments Segments.
+ */
+ public MockWalIteratorFactory(@Nullable IgniteLogger log, int pageSize, String consistentId, int segments) {
+ this.log = log == null ? Mockito.mock(IgniteLogger.class) : log;
+ this.pageSize = pageSize;
+ this.consistentId = consistentId;
+ this.segments = segments;
+ }
+
+ /**
+ * Creates iterator
+ * @param wal WAL directory without node id
+ * @param walArchive WAL archive without node id
+ * @return iterator
+ * @throws IgniteCheckedException if IO failed
+ */
+ public WALIterator iterator(File wal, File walArchive) throws IgniteCheckedException {
+ final PersistentStoreConfiguration persistentCfg1 = Mockito.mock(PersistentStoreConfiguration.class);
+
+ when(persistentCfg1.getWalStorePath()).thenReturn(wal.getAbsolutePath());
+ when(persistentCfg1.getWalArchivePath()).thenReturn(walArchive.getAbsolutePath());
+ when(persistentCfg1.getWalSegments()).thenReturn(segments);
+ when(persistentCfg1.getTlbSize()).thenReturn(PersistentStoreConfiguration.DFLT_TLB_SIZE);
+ when(persistentCfg1.getWalRecordIteratorBufferSize()).thenReturn(PersistentStoreConfiguration.DFLT_WAL_RECORD_ITERATOR_BUFFER_SIZE);
+
+ final IgniteConfiguration cfg = Mockito.mock(IgniteConfiguration.class);
+
+ when(cfg.getPersistentStoreConfiguration()).thenReturn(persistentCfg1);
+
+ final GridKernalContext ctx = Mockito.mock(GridKernalContext.class);
+
+ when(ctx.config()).thenReturn(cfg);
+ when(ctx.clientNode()).thenReturn(false);
+
+ final GridDiscoveryManager disco = Mockito.mock(GridDiscoveryManager.class);
+
+ when(disco.consistentId()).thenReturn(consistentId);
+ when(ctx.discovery()).thenReturn(disco);
+
+ final IgniteWriteAheadLogManager mgr = new FileWriteAheadLogManager(ctx);
+ final GridCacheSharedContext sctx = Mockito.mock(GridCacheSharedContext.class);
+
+ when(sctx.kernalContext()).thenReturn(ctx);
+ when(sctx.discovery()).thenReturn(disco);
+
+ final GridCacheDatabaseSharedManager database = Mockito.mock(GridCacheDatabaseSharedManager.class);
+
+ when(database.pageSize()).thenReturn(pageSize);
+ when(sctx.database()).thenReturn(database);
+ when(sctx.logger(any(Class.class))).thenReturn(log);
+
+ mgr.start(sctx);
+
+ return mgr.replay(null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index 351f52e..8018705 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -18,22 +18,18 @@
package org.apache.ignite.testsuites;
import junit.framework.TestSuite;
-import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsAtomicCacheRebalancingTest;
-import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsTxCacheRebalancingTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsContinuousRestartTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsPageSizesTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsRecoveryAfterFileCorruptionTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePersistenceMetricsSelfTest;
import org.apache.ignite.internal.processors.cache.persistence.IgnitePersistentStoreDataStructuresTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsPageEvictionDuringPartitionClearTest;
+import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsRebalancingOnNotStableTopologyTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsTransactionsHangTest;
import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsWholeClusterRestartTest;
-import org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsRebalancingOnNotStableTopologyTest;
-import org.apache.ignite.internal.processors.cache.persistence.db.file.IgnitePdsNoActualWalHistoryTest;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalHistoryReservationsTest;
-import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRecoveryTest;
-import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalRecoveryTxLogicalRecordsTest;
import org.apache.ignite.internal.processors.cache.persistence.db.wal.crc.IgniteDataIntegrityTests;
+import org.apache.ignite.internal.processors.cache.persistence.db.wal.reader.IgniteWalReaderTest;
/**
*
@@ -69,6 +65,7 @@ public class IgnitePdsTestSuite2 extends TestSuite {
suite.addTestSuite(IgnitePersistentStoreDataStructuresTest.class);
+ suite.addTestSuite(IgniteWalReaderTest.class);
return suite;
}
}