You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2016/06/14 15:09:48 UTC
[09/10] ignite git commit: IGNITE-2938: IGFS: Puts during secondary
file reads are now performed synchronously and with proper semantics.
IGNITE-2938: IGFS: Puts during secondary file reads are now performed synchronously and with proper semantics.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/98a0990c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/98a0990c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/98a0990c
Branch: refs/heads/ignite-3215
Commit: 98a0990c90fbfe5737f4f0f2d1c34a84fd0a6dde
Parents: 54425bf
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Jun 14 17:39:44 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Jun 14 17:39:44 2016 +0300
----------------------------------------------------------------------
.../configuration/FileSystemConfiguration.java | 18 +-
.../processors/igfs/IgfsDataManager.java | 166 ++++---------------
.../igfs/data/IgfsDataPutProcessor.java | 99 +++++++++++
3 files changed, 147 insertions(+), 136 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/98a0990c/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
index 074636a..88b0c28 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
@@ -704,7 +704,9 @@ public class FileSystemConfiguration {
* Gets maximum timeout awaiting for trash purging in case data cache oversize is detected.
*
* @return Maximum timeout awaiting for trash purging in case data cache oversize is detected.
+ * @deprecated Not used any more.
*/
+ @Deprecated
public long getTrashPurgeTimeout() {
return trashPurgeTimeout;
}
@@ -713,7 +715,9 @@ public class FileSystemConfiguration {
* Sets maximum timeout awaiting for trash purging in case data cache oversize is detected.
*
* @param trashPurgeTimeout Maximum timeout awaiting for trash purging in case data cache oversize is detected.
+ * @deprecated Not used any more.
*/
+ @Deprecated
public void setTrashPurgeTimeout(long trashPurgeTimeout) {
this.trashPurgeTimeout = trashPurgeTimeout;
}
@@ -724,8 +728,10 @@ public class FileSystemConfiguration {
* In case no executor service is provided, default one will be created with maximum amount of threads equals
* to amount of processor cores.
*
- * @return Get DUAL mode put operation executor service
+ * @return Get DUAL mode put operation executor service.
+ * @deprecated Not used any more.
*/
+ @Deprecated
@Nullable public ExecutorService getDualModePutExecutorService() {
return dualModePutExec;
}
@@ -734,7 +740,9 @@ public class FileSystemConfiguration {
* Set DUAL mode put operations executor service.
*
* @param dualModePutExec Dual mode put operations executor service.
+ * @deprecated Not used any more.
*/
+ @Deprecated
public void setDualModePutExecutorService(ExecutorService dualModePutExec) {
this.dualModePutExec = dualModePutExec;
}
@@ -743,7 +751,9 @@ public class FileSystemConfiguration {
* Get DUAL mode put operation executor service shutdown flag.
*
* @return DUAL mode put operation executor service shutdown flag.
+ * @deprecated Not used any more.
*/
+ @Deprecated
public boolean getDualModePutExecutorServiceShutdown() {
return dualModePutExecShutdown;
}
@@ -752,7 +762,9 @@ public class FileSystemConfiguration {
* Set DUAL mode put operations executor service shutdown flag.
*
* @param dualModePutExecShutdown Dual mode put operations executor service shutdown flag.
+ * @deprecated Not used any more.
*/
+ @Deprecated
public void setDualModePutExecutorServiceShutdown(boolean dualModePutExecShutdown) {
this.dualModePutExecShutdown = dualModePutExecShutdown;
}
@@ -766,7 +778,9 @@ public class FileSystemConfiguration {
* avoid issues with increasing GC pauses or out-of-memory error.
*
* @return Maximum amount of pending data read from the secondary file system
+ * @deprecated Not used any more.
*/
+ @Deprecated
public long getDualModeMaxPendingPutsSize() {
return dualModeMaxPendingPutsSize;
}
@@ -775,7 +789,9 @@ public class FileSystemConfiguration {
* Set maximum amount of data in pending put operations.
*
* @param dualModeMaxPendingPutsSize Maximum amount of data in pending put operations.
+ * @deprecated Not used any more.
*/
+ @Deprecated
public void setDualModeMaxPendingPutsSize(long dualModeMaxPendingPutsSize) {
this.dualModeMaxPendingPutsSize = dualModeMaxPendingPutsSize;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/98a0990c/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index 34d77f9..cb2b630 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerCacheUpdaters;
+import org.apache.ignite.internal.processors.igfs.data.IgfsDataPutProcessor;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -53,7 +54,6 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
@@ -78,10 +78,8 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@@ -98,9 +96,6 @@ import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA
* Cache based file's data container.
*/
public class IgfsDataManager extends IgfsManager {
- /** IGFS. */
- private IgfsEx igfs;
-
/** Data internal cache. */
private IgniteInternalCache<IgfsBlockKey, byte[]> dataCachePrj;
@@ -143,31 +138,10 @@ public class IgfsDataManager extends IgfsManager {
/** Async file delete worker. */
private AsyncDeleteWorker delWorker;
- /** Trash purge timeout. */
- private long trashPurgeTimeout;
-
/** On-going remote reads futures. */
private final ConcurrentHashMap8<IgfsBlockKey, IgniteInternalFuture<byte[]>> rmtReadFuts =
new ConcurrentHashMap8<>();
- /** Executor service for puts in dual mode */
- private volatile ExecutorService putExecSvc;
-
- /** Executor service for puts in dual mode shutdown flag. */
- private volatile boolean putExecSvcShutdown;
-
- /** Maximum amount of data in pending puts. */
- private volatile long maxPendingPuts;
-
- /** Current amount of data in pending puts. */
- private long curPendingPuts;
-
- /** Lock for pending puts. */
- private final Lock pendingPutsLock = new ReentrantLock();
-
- /** Condition for pending puts. */
- private final Condition pendingPutsCond = pendingPutsLock.newCondition();
-
/**
*
*/
@@ -182,8 +156,6 @@ public class IgfsDataManager extends IgfsManager {
/** {@inheritDoc} */
@Override protected void start0() throws IgniteCheckedException {
- igfs = igfsCtx.igfs();
-
dataCacheStartLatch = new CountDownLatch(1);
String igfsName = igfsCtx.configuration().getName();
@@ -216,23 +188,6 @@ public class IgfsDataManager extends IgfsManager {
igfsSvc = igfsCtx.kernalContext().getIgfsExecutorService();
- trashPurgeTimeout = igfsCtx.configuration().getTrashPurgeTimeout();
-
- putExecSvc = igfsCtx.configuration().getDualModePutExecutorService();
-
- if (putExecSvc != null)
- putExecSvcShutdown = igfsCtx.configuration().getDualModePutExecutorServiceShutdown();
- else {
- int coresCnt = Runtime.getRuntime().availableProcessors();
-
- // Note that we do not pre-start threads here as IGFS pool may not be needed.
- putExecSvc = new IgniteThreadPoolExecutor(coresCnt, coresCnt, 0, new LinkedBlockingDeque<Runnable>());
-
- putExecSvcShutdown = true;
- }
-
- maxPendingPuts = igfsCtx.configuration().getDualModeMaxPendingPutsSize();
-
delWorker = new AsyncDeleteWorker(igfsCtx.kernalContext().gridName(),
"igfs-" + igfsName + "-delete-worker", log);
}
@@ -282,9 +237,6 @@ public class IgfsDataManager extends IgfsManager {
catch (IgniteInterruptedCheckedException e) {
log.warning("Got interrupter while waiting for delete worker to stop (will continue stopping).", e);
}
-
- if (putExecSvcShutdown)
- U.shutdownNow(getClass(), putExecSvc, log);
}
/**
@@ -308,6 +260,7 @@ public class IgfsDataManager extends IgfsManager {
* @param prevAffKey Affinity key of previous block.
* @return Affinity key.
*/
+ @SuppressWarnings("ConstantConditions")
public IgniteUuid nextAffinityKey(@Nullable IgniteUuid prevAffKey) {
// Do not generate affinity key for non-affinity nodes.
if (!dataCache.context().affinityNode())
@@ -371,8 +324,6 @@ public class IgfsDataManager extends IgfsManager {
@Nullable public IgniteInternalFuture<byte[]> dataBlock(final IgfsEntryInfo fileInfo, final IgfsPath path,
final long blockIdx, @Nullable final IgfsSecondaryFileSystemPositionedReadable secReader)
throws IgniteCheckedException {
- //assert validTxState(any); // Allow this method call for any transaction state.
-
assert fileInfo != null;
assert blockIdx >= 0;
@@ -435,7 +386,7 @@ public class IgfsDataManager extends IgfsManager {
rmtReadFut.onDone(res);
- putSafe(key, res);
+ putBlock(fileInfo.blockSize(), key, res);
metrics.addReadBlocks(1, 1);
}
@@ -471,6 +422,26 @@ public class IgfsDataManager extends IgfsManager {
}
/**
+ * Stores the given block in data cache.
+ *
+ * @param blockSize The size of the block.
+ * @param key The data cache key of the block.
+ * @param data The new value of the block.
+ */
+ private void putBlock(int blockSize, IgfsBlockKey key, byte[] data) throws IgniteCheckedException {
+ if (data.length < blockSize)
+ // partial (incomplete) block:
+ dataCachePrj.invoke(key, new IgfsDataPutProcessor(data));
+ else {
+ // whole block:
+ assert data.length == blockSize;
+
+ dataCachePrj.put(key, data);
+ }
+ }
+
+
+ /**
* Registers write future in igfs data manager.
*
* @param fileId File ID.
@@ -680,7 +651,7 @@ public class IgfsDataManager extends IgfsManager {
byte[] val = vals.get(colocatedKey);
if (val != null) {
- dataCachePrj.put(key, val);
+ putBlock(fileInfo.blockSize(), key, val);
tx.commit();
}
@@ -744,7 +715,6 @@ public class IgfsDataManager extends IgfsManager {
*/
public Collection<IgfsBlockLocation> affinity(IgfsEntryInfo info, long start, long len, long maxLen)
throws IgniteCheckedException {
- assert validTxState(false);
assert info.isFile() : "Failed to get affinity (not a file): " + info;
assert start >= 0 : "Start position should not be negative: " + start;
assert len >= 0 : "Part length should not be negative: " + len;
@@ -974,21 +944,6 @@ public class IgfsDataManager extends IgfsManager {
}
/**
- * Check transaction is (not) started.
- *
- * @param inTx Expected transaction state.
- * @return Transaction state is correct.
- */
- private boolean validTxState(boolean inTx) {
- boolean txState = inTx == (dataCachePrj.tx() != null);
-
- assert txState : (inTx ? "Method cannot be called outside transaction: " :
- "Method cannot be called in transaction: ") + dataCachePrj.tx();
-
- return txState;
- }
-
- /**
* @param fileId File ID.
* @param node Node to process blocks on.
* @param blocks Blocks to put in cache.
@@ -1056,10 +1011,11 @@ public class IgfsDataManager extends IgfsManager {
* @param colocatedKey Block key.
* @param startOff Data start offset within block.
* @param data Data to write.
+ * @param blockSize The block size.
* @throws IgniteCheckedException If update failed.
*/
private void processPartialBlockWrite(IgniteUuid fileId, IgfsBlockKey colocatedKey, int startOff,
- byte[] data) throws IgniteCheckedException {
+ byte[] data, int blockSize) throws IgniteCheckedException {
if (dataCachePrj.igfsDataSpaceUsed() >= dataCachePrj.igfsDataSpaceMax()) {
final WriteCompletionFuture completionFut = pendingWrites.get(fileId);
@@ -1090,7 +1046,7 @@ public class IgfsDataManager extends IgfsManager {
// If writing from block beginning, just put and return.
if (startOff == 0) {
- dataCachePrj.put(colocatedKey, data);
+ putBlock(blockSize, colocatedKey, data);
return;
}
@@ -1151,67 +1107,6 @@ public class IgfsDataManager extends IgfsManager {
}
/**
- * Put data block read from the secondary file system to the cache.
- *
- * @param key Key.
- * @param data Data.
- * @throws IgniteCheckedException If failed.
- */
- private void putSafe(final IgfsBlockKey key, final byte[] data) throws IgniteCheckedException {
- assert key != null;
- assert data != null;
-
- if (maxPendingPuts > 0) {
- pendingPutsLock.lock();
-
- try {
- while (curPendingPuts > maxPendingPuts)
- pendingPutsCond.await(2000, TimeUnit.MILLISECONDS);
-
- curPendingPuts += data.length;
- }
- catch (InterruptedException ignore) {
- throw new IgniteCheckedException("Failed to put IGFS data block into cache due to interruption: " + key);
- }
- finally {
- pendingPutsLock.unlock();
- }
- }
-
- Runnable task = new Runnable() {
- @Override public void run() {
- try {
- dataCachePrj.put(key, data);
- }
- catch (IgniteCheckedException e) {
- U.warn(log, "Failed to put IGFS data block into cache [key=" + key + ", err=" + e + ']');
- }
- finally {
- if (maxPendingPuts > 0) {
- pendingPutsLock.lock();
-
- try {
- curPendingPuts -= data.length;
-
- pendingPutsCond.signalAll();
- }
- finally {
- pendingPutsLock.unlock();
- }
- }
- }
- }
- };
-
- try {
- putExecSvc.submit(task);
- }
- catch (RejectedExecutionException ignore) {
- task.run();
- }
- }
-
- /**
* @param blocks Blocks to write.
* @return Future that will be completed after put is done.
*/
@@ -1261,6 +1156,7 @@ public class IgfsDataManager extends IgfsManager {
* @param nodeId Node ID.
* @param ackMsg Write acknowledgement message.
*/
+ @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
private void processAckMessage(UUID nodeId, IgfsAckMessage ackMsg) {
try {
ackMsg.finishUnmarshal(igfsCtx.kernalContext().config().getMarshaller(), null);
@@ -1343,6 +1239,7 @@ public class IgfsDataManager extends IgfsManager {
* @throws IgniteCheckedException If failed.
* @return Data remainder if {@code flush} flag is {@code false}.
*/
+ @SuppressWarnings("ConstantConditions")
@Nullable public byte[] storeDataBlocks(
IgfsEntryInfo fileInfo,
long reservedLen,
@@ -1456,7 +1353,7 @@ public class IgfsDataManager extends IgfsManager {
if (size != blockSize) {
// Partial writes must be always synchronous.
- processPartialBlockWrite(id, key, block == first ? off : 0, portion);
+ processPartialBlockWrite(id, key, block == first ? off : 0, portion, blockSize);
writtenTotal++;
}
@@ -1617,8 +1514,6 @@ public class IgfsDataManager extends IgfsManager {
protected AsyncDeleteWorker(@Nullable String gridName, String name, IgniteLogger log) {
super(gridName, name, log);
- long time = System.currentTimeMillis();
-
stopInfo = IgfsUtils.createDirectory(IgniteUuid.randomUuid());
}
@@ -1642,6 +1537,7 @@ public class IgfsDataManager extends IgfsManager {
}
/** {@inheritDoc} */
+ @SuppressWarnings("ConstantConditions")
@Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
try {
while (!isCancelled()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/98a0990c/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/data/IgfsDataPutProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/data/IgfsDataPutProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/data/IgfsDataPutProcessor.java
new file mode 100644
index 0000000..2029d4e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/data/IgfsDataPutProcessor.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs.data;
+
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryReader;
+import org.apache.ignite.binary.BinaryWriter;
+import org.apache.ignite.binary.Binarylizable;
+import org.apache.ignite.internal.processors.igfs.IgfsBlockKey;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+/**
+ * Entry processor to set or replace block byte value.
+ */
+public class IgfsDataPutProcessor implements EntryProcessor<IgfsBlockKey, byte[], Void>, Externalizable, Binarylizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** The new value. */
+ private byte[] newVal;
+
+ /**
+ * Non-arg constructor required by externalizable.
+ */
+ public IgfsDataPutProcessor() {
+ // no-op
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param newVal The new value.
+ */
+ public IgfsDataPutProcessor(byte[] newVal) {
+ assert newVal != null;
+
+ this.newVal = newVal;
+ }
+
+ /** {@inheritDoc} */
+ public Void process(MutableEntry<IgfsBlockKey, byte[]> entry, Object... args)
+ throws EntryProcessorException {
+ byte[] curVal = entry.getValue();
+
+ if (curVal == null || newVal.length > curVal.length)
+ entry.setValue(newVal);
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ newVal = U.readByteArray(in);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ U.writeByteArray(out, newVal);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readBinary(BinaryReader reader) throws BinaryObjectException {
+ newVal = reader.rawReader().readByteArray();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
+ writer.rawWriter().writeByteArray(newVal);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgfsDataPutProcessor.class, this);
+ }
+}