You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2016/06/14 15:09:48 UTC

[09/10] ignite git commit: IGNITE-2938: IGFS: Puts during secondary file reads are now performed synchronously and with proper semantics.

IGNITE-2938: IGFS: Puts during secondary file reads are now performed synchronously and with proper semantics.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/98a0990c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/98a0990c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/98a0990c

Branch: refs/heads/ignite-3215
Commit: 98a0990c90fbfe5737f4f0f2d1c34a84fd0a6dde
Parents: 54425bf
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Jun 14 17:39:44 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Jun 14 17:39:44 2016 +0300

----------------------------------------------------------------------
 .../configuration/FileSystemConfiguration.java  |  18 +-
 .../processors/igfs/IgfsDataManager.java        | 166 ++++---------------
 .../igfs/data/IgfsDataPutProcessor.java         |  99 +++++++++++
 3 files changed, 147 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/98a0990c/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
index 074636a..88b0c28 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
@@ -704,7 +704,9 @@ public class FileSystemConfiguration {
      * Gets maximum timeout awaiting for trash purging in case data cache oversize is detected.
      *
      * @return Maximum timeout awaiting for trash purging in case data cache oversize is detected.
+     * @deprecated Not used any more.
      */
+    @Deprecated
     public long getTrashPurgeTimeout() {
         return trashPurgeTimeout;
     }
@@ -713,7 +715,9 @@ public class FileSystemConfiguration {
      * Sets maximum timeout awaiting for trash purging in case data cache oversize is detected.
      *
      * @param trashPurgeTimeout Maximum timeout awaiting for trash purging in case data cache oversize is detected.
+     * @deprecated Not used any more.
      */
+    @Deprecated
     public void setTrashPurgeTimeout(long trashPurgeTimeout) {
         this.trashPurgeTimeout = trashPurgeTimeout;
     }
@@ -724,8 +728,10 @@ public class FileSystemConfiguration {
      * In case no executor service is provided, default one will be created with maximum amount of threads equals
      * to amount of processor cores.
      *
-     * @return Get DUAL mode put operation executor service
+     * @return Get DUAL mode put operation executor service.
+     * @deprecated Not used any more.
      */
+    @Deprecated
     @Nullable public ExecutorService getDualModePutExecutorService() {
         return dualModePutExec;
     }
@@ -734,7 +740,9 @@ public class FileSystemConfiguration {
      * Set DUAL mode put operations executor service.
      *
      * @param dualModePutExec Dual mode put operations executor service.
+     * @deprecated Not used any more.
      */
+    @Deprecated
     public void setDualModePutExecutorService(ExecutorService dualModePutExec) {
         this.dualModePutExec = dualModePutExec;
     }
@@ -743,7 +751,9 @@ public class FileSystemConfiguration {
      * Get DUAL mode put operation executor service shutdown flag.
      *
      * @return DUAL mode put operation executor service shutdown flag.
+     * @deprecated Not used any more.
      */
+    @Deprecated
     public boolean getDualModePutExecutorServiceShutdown() {
         return dualModePutExecShutdown;
     }
@@ -752,7 +762,9 @@ public class FileSystemConfiguration {
      * Set DUAL mode put operations executor service shutdown flag.
      *
      * @param dualModePutExecShutdown Dual mode put operations executor service shutdown flag.
+     * @deprecated Not used any more.
      */
+    @Deprecated
     public void setDualModePutExecutorServiceShutdown(boolean dualModePutExecShutdown) {
         this.dualModePutExecShutdown = dualModePutExecShutdown;
     }
@@ -766,7 +778,9 @@ public class FileSystemConfiguration {
      * avoid issues with increasing GC pauses or out-of-memory error.
      *
      * @return Maximum amount of pending data read from the secondary file system
+     * @deprecated Not used any more.
      */
+    @Deprecated
     public long getDualModeMaxPendingPutsSize() {
         return dualModeMaxPendingPutsSize;
     }
@@ -775,7 +789,9 @@ public class FileSystemConfiguration {
      * Set maximum amount of data in pending put operations.
      *
      * @param dualModeMaxPendingPutsSize Maximum amount of data in pending put operations.
+     * @deprecated Not used any more.
      */
+    @Deprecated
     public void setDualModeMaxPendingPutsSize(long dualModeMaxPendingPutsSize) {
         this.dualModeMaxPendingPutsSize = dualModeMaxPendingPutsSize;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/98a0990c/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index 34d77f9..cb2b630 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerCacheUpdaters;
+import org.apache.ignite.internal.processors.igfs.data.IgfsDataPutProcessor;
 import org.apache.ignite.internal.processors.task.GridInternal;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -53,7 +54,6 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.thread.IgniteThreadPoolExecutor;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
@@ -78,10 +78,8 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
@@ -98,9 +96,6 @@ import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA
  * Cache based file's data container.
  */
 public class IgfsDataManager extends IgfsManager {
-    /** IGFS. */
-    private IgfsEx igfs;
-
     /** Data internal cache. */
     private IgniteInternalCache<IgfsBlockKey, byte[]> dataCachePrj;
 
@@ -143,31 +138,10 @@ public class IgfsDataManager extends IgfsManager {
     /** Async file delete worker. */
     private AsyncDeleteWorker delWorker;
 
-    /** Trash purge timeout. */
-    private long trashPurgeTimeout;
-
     /** On-going remote reads futures. */
     private final ConcurrentHashMap8<IgfsBlockKey, IgniteInternalFuture<byte[]>> rmtReadFuts =
         new ConcurrentHashMap8<>();
 
-    /** Executor service for puts in dual mode */
-    private volatile ExecutorService putExecSvc;
-
-    /** Executor service for puts in dual mode shutdown flag. */
-    private volatile boolean putExecSvcShutdown;
-
-    /** Maximum amount of data in pending puts. */
-    private volatile long maxPendingPuts;
-
-    /** Current amount of data in pending puts. */
-    private long curPendingPuts;
-
-    /** Lock for pending puts. */
-    private final Lock pendingPutsLock = new ReentrantLock();
-
-    /** Condition for pending puts. */
-    private final Condition pendingPutsCond = pendingPutsLock.newCondition();
-
     /**
      *
      */
@@ -182,8 +156,6 @@ public class IgfsDataManager extends IgfsManager {
 
     /** {@inheritDoc} */
     @Override protected void start0() throws IgniteCheckedException {
-        igfs = igfsCtx.igfs();
-
         dataCacheStartLatch = new CountDownLatch(1);
 
         String igfsName = igfsCtx.configuration().getName();
@@ -216,23 +188,6 @@ public class IgfsDataManager extends IgfsManager {
 
         igfsSvc = igfsCtx.kernalContext().getIgfsExecutorService();
 
-        trashPurgeTimeout = igfsCtx.configuration().getTrashPurgeTimeout();
-
-        putExecSvc = igfsCtx.configuration().getDualModePutExecutorService();
-
-        if (putExecSvc != null)
-            putExecSvcShutdown = igfsCtx.configuration().getDualModePutExecutorServiceShutdown();
-        else {
-            int coresCnt = Runtime.getRuntime().availableProcessors();
-
-            // Note that we do not pre-start threads here as IGFS pool may not be needed.
-            putExecSvc = new IgniteThreadPoolExecutor(coresCnt, coresCnt, 0, new LinkedBlockingDeque<Runnable>());
-
-            putExecSvcShutdown = true;
-        }
-
-        maxPendingPuts = igfsCtx.configuration().getDualModeMaxPendingPutsSize();
-
         delWorker = new AsyncDeleteWorker(igfsCtx.kernalContext().gridName(),
             "igfs-" + igfsName + "-delete-worker", log);
     }
@@ -282,9 +237,6 @@ public class IgfsDataManager extends IgfsManager {
         catch (IgniteInterruptedCheckedException e) {
             log.warning("Got interrupter while waiting for delete worker to stop (will continue stopping).", e);
         }
-
-        if (putExecSvcShutdown)
-            U.shutdownNow(getClass(), putExecSvc, log);
     }
 
     /**
@@ -308,6 +260,7 @@ public class IgfsDataManager extends IgfsManager {
      * @param prevAffKey Affinity key of previous block.
      * @return Affinity key.
      */
+    @SuppressWarnings("ConstantConditions")
     public IgniteUuid nextAffinityKey(@Nullable IgniteUuid prevAffKey) {
         // Do not generate affinity key for non-affinity nodes.
         if (!dataCache.context().affinityNode())
@@ -371,8 +324,6 @@ public class IgfsDataManager extends IgfsManager {
     @Nullable public IgniteInternalFuture<byte[]> dataBlock(final IgfsEntryInfo fileInfo, final IgfsPath path,
         final long blockIdx, @Nullable final IgfsSecondaryFileSystemPositionedReadable secReader)
         throws IgniteCheckedException {
-        //assert validTxState(any); // Allow this method call for any transaction state.
-
         assert fileInfo != null;
         assert blockIdx >= 0;
 
@@ -435,7 +386,7 @@ public class IgfsDataManager extends IgfsManager {
 
                                 rmtReadFut.onDone(res);
 
-                                putSafe(key, res);
+                                putBlock(fileInfo.blockSize(), key, res);
 
                                 metrics.addReadBlocks(1, 1);
                             }
@@ -471,6 +422,26 @@ public class IgfsDataManager extends IgfsManager {
     }
 
     /**
+     * Stores the given block in data cache.
+     *
+     * @param blockSize The size of the block.
+     * @param key The data cache key of the block.
+     * @param data The new value of the block.
+     */
+    private void putBlock(int blockSize, IgfsBlockKey key, byte[] data) throws IgniteCheckedException {
+        if (data.length < blockSize)
+            // partial (incomplete) block:
+            dataCachePrj.invoke(key, new IgfsDataPutProcessor(data));
+        else {
+            // whole block:
+            assert data.length == blockSize;
+
+            dataCachePrj.put(key, data);
+        }
+    }
+
+
+    /**
      * Registers write future in igfs data manager.
      *
      * @param fileId File ID.
@@ -680,7 +651,7 @@ public class IgfsDataManager extends IgfsManager {
                                 byte[] val = vals.get(colocatedKey);
 
                                 if (val != null) {
-                                    dataCachePrj.put(key, val);
+                                    putBlock(fileInfo.blockSize(), key, val);
 
                                     tx.commit();
                                 }
@@ -744,7 +715,6 @@ public class IgfsDataManager extends IgfsManager {
      */
     public Collection<IgfsBlockLocation> affinity(IgfsEntryInfo info, long start, long len, long maxLen)
         throws IgniteCheckedException {
-        assert validTxState(false);
         assert info.isFile() : "Failed to get affinity (not a file): " + info;
         assert start >= 0 : "Start position should not be negative: " + start;
         assert len >= 0 : "Part length should not be negative: " + len;
@@ -974,21 +944,6 @@ public class IgfsDataManager extends IgfsManager {
     }
 
     /**
-     * Check transaction is (not) started.
-     *
-     * @param inTx Expected transaction state.
-     * @return Transaction state is correct.
-     */
-    private boolean validTxState(boolean inTx) {
-        boolean txState = inTx == (dataCachePrj.tx() != null);
-
-        assert txState : (inTx ? "Method cannot be called outside transaction: " :
-            "Method cannot be called in transaction: ") + dataCachePrj.tx();
-
-        return txState;
-    }
-
-    /**
      * @param fileId File ID.
      * @param node Node to process blocks on.
      * @param blocks Blocks to put in cache.
@@ -1056,10 +1011,11 @@ public class IgfsDataManager extends IgfsManager {
      * @param colocatedKey Block key.
      * @param startOff Data start offset within block.
      * @param data Data to write.
+     * @param blockSize The block size.
      * @throws IgniteCheckedException If update failed.
      */
     private void processPartialBlockWrite(IgniteUuid fileId, IgfsBlockKey colocatedKey, int startOff,
-        byte[] data) throws IgniteCheckedException {
+        byte[] data, int blockSize) throws IgniteCheckedException {
         if (dataCachePrj.igfsDataSpaceUsed() >= dataCachePrj.igfsDataSpaceMax()) {
             final WriteCompletionFuture completionFut = pendingWrites.get(fileId);
 
@@ -1090,7 +1046,7 @@ public class IgfsDataManager extends IgfsManager {
 
         // If writing from block beginning, just put and return.
         if (startOff == 0) {
-            dataCachePrj.put(colocatedKey, data);
+            putBlock(blockSize, colocatedKey, data);
 
             return;
         }
@@ -1151,67 +1107,6 @@ public class IgfsDataManager extends IgfsManager {
     }
 
     /**
-     * Put data block read from the secondary file system to the cache.
-     *
-     * @param key Key.
-     * @param data Data.
-     * @throws IgniteCheckedException If failed.
-     */
-    private void putSafe(final IgfsBlockKey key, final byte[] data) throws IgniteCheckedException {
-        assert key != null;
-        assert data != null;
-
-        if (maxPendingPuts > 0) {
-            pendingPutsLock.lock();
-
-            try {
-                while (curPendingPuts > maxPendingPuts)
-                    pendingPutsCond.await(2000, TimeUnit.MILLISECONDS);
-
-                curPendingPuts += data.length;
-            }
-            catch (InterruptedException ignore) {
-                throw new IgniteCheckedException("Failed to put IGFS data block into cache due to interruption: " + key);
-            }
-            finally {
-                pendingPutsLock.unlock();
-            }
-        }
-
-        Runnable task = new Runnable() {
-            @Override public void run() {
-                try {
-                    dataCachePrj.put(key, data);
-                }
-                catch (IgniteCheckedException e) {
-                    U.warn(log, "Failed to put IGFS data block into cache [key=" + key + ", err=" + e + ']');
-                }
-                finally {
-                    if (maxPendingPuts > 0) {
-                        pendingPutsLock.lock();
-
-                        try {
-                            curPendingPuts -= data.length;
-
-                            pendingPutsCond.signalAll();
-                        }
-                        finally {
-                            pendingPutsLock.unlock();
-                        }
-                    }
-                }
-            }
-        };
-
-        try {
-            putExecSvc.submit(task);
-        }
-        catch (RejectedExecutionException ignore) {
-            task.run();
-        }
-    }
-
-    /**
      * @param blocks Blocks to write.
      * @return Future that will be completed after put is done.
      */
@@ -1261,6 +1156,7 @@ public class IgfsDataManager extends IgfsManager {
      * @param nodeId Node ID.
      * @param ackMsg Write acknowledgement message.
      */
+    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
     private void processAckMessage(UUID nodeId, IgfsAckMessage ackMsg) {
         try {
             ackMsg.finishUnmarshal(igfsCtx.kernalContext().config().getMarshaller(), null);
@@ -1343,6 +1239,7 @@ public class IgfsDataManager extends IgfsManager {
          * @throws IgniteCheckedException If failed.
          * @return Data remainder if {@code flush} flag is {@code false}.
          */
+        @SuppressWarnings("ConstantConditions")
         @Nullable public byte[] storeDataBlocks(
             IgfsEntryInfo fileInfo,
             long reservedLen,
@@ -1456,7 +1353,7 @@ public class IgfsDataManager extends IgfsManager {
 
                 if (size != blockSize) {
                     // Partial writes must be always synchronous.
-                    processPartialBlockWrite(id, key, block == first ? off : 0, portion);
+                    processPartialBlockWrite(id, key, block == first ? off : 0, portion, blockSize);
 
                     writtenTotal++;
                 }
@@ -1617,8 +1514,6 @@ public class IgfsDataManager extends IgfsManager {
         protected AsyncDeleteWorker(@Nullable String gridName, String name, IgniteLogger log) {
             super(gridName, name, log);
 
-            long time = System.currentTimeMillis();
-
             stopInfo = IgfsUtils.createDirectory(IgniteUuid.randomUuid());
         }
 
@@ -1642,6 +1537,7 @@ public class IgfsDataManager extends IgfsManager {
         }
 
         /** {@inheritDoc} */
+        @SuppressWarnings("ConstantConditions")
         @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
             try {
                 while (!isCancelled()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/98a0990c/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/data/IgfsDataPutProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/data/IgfsDataPutProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/data/IgfsDataPutProcessor.java
new file mode 100644
index 0000000..2029d4e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/data/IgfsDataPutProcessor.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs.data;
+
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryReader;
+import org.apache.ignite.binary.BinaryWriter;
+import org.apache.ignite.binary.Binarylizable;
+import org.apache.ignite.internal.processors.igfs.IgfsBlockKey;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+/**
+ * Entry processor to set or replace block byte value.
+ */
+public class IgfsDataPutProcessor implements EntryProcessor<IgfsBlockKey, byte[], Void>, Externalizable, Binarylizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** The new value. */
+    private byte[] newVal;
+
+    /**
+     * Non-arg constructor required by externalizable.
+     */
+    public IgfsDataPutProcessor() {
+        // no-op
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param newVal The new value.
+     */
+    public IgfsDataPutProcessor(byte[] newVal) {
+        assert newVal != null;
+
+        this.newVal = newVal;
+    }
+
+    /** {@inheritDoc} */
+    public Void process(MutableEntry<IgfsBlockKey, byte[]> entry, Object... args)
+        throws EntryProcessorException {
+        byte[] curVal = entry.getValue();
+
+        if (curVal == null || newVal.length > curVal.length)
+            entry.setValue(newVal);
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        newVal = U.readByteArray(in);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        U.writeByteArray(out, newVal);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readBinary(BinaryReader reader) throws BinaryObjectException {
+        newVal = reader.rawReader().readByteArray();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
+        writer.rawWriter().writeByteArray(newVal);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgfsDataPutProcessor.class, this);
+    }
+}