You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2020/05/19 09:48:43 UTC

[ignite] 01/01: IGNITE-13032 Use non-static stream pool for optimized marshaller.

This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-13032
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit 6c2ee3a7f6f1e178d0db7205b3e225a9e1e6bfba
Author: Alexey Goncharuk <al...@gmail.com>
AuthorDate: Wed Apr 15 16:57:59 2020 +0300

    IGNITE-13032 Use non-static stream pool for optimized marshaller.
---
 .../marshaller/optimized/OptimizedMarshaller.java  |  23 +--
 .../OptimizedObjectPooledStreamRegistry.java       |  89 +++++++++++
 .../OptimizedObjectSharedStreamRegistry.java       | 122 +++++++++++++++
 .../optimized/OptimizedObjectStreamRegistry.java   | 170 +--------------------
 .../optimized/OptimizedObjectStreamSelfTest.java   |  20 +--
 .../ignite/testsuites/IgniteReproducingSuite.java  |   6 +-
 6 files changed, 246 insertions(+), 184 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedMarshaller.java b/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedMarshaller.java
index c77cfa25..f5ae585 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedMarshaller.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedMarshaller.java
@@ -100,6 +100,9 @@ public class OptimizedMarshaller extends AbstractNodeNameAwareMarshaller {
     /** Class descriptors by class. */
     private final ConcurrentMap<Class, OptimizedClassDescriptor> clsMap = new ConcurrentHashMap<>();
 
+    /** */
+    private OptimizedObjectStreamRegistry registry = new OptimizedObjectSharedStreamRegistry();
+
     /**
      * Creates new marshaller will all defaults.
      *
@@ -162,7 +165,9 @@ public class OptimizedMarshaller extends AbstractNodeNameAwareMarshaller {
      * @return {@code this} for chaining.
      */
     public OptimizedMarshaller setPoolSize(int poolSize) {
-        OptimizedObjectStreamRegistry.poolSize(poolSize);
+        registry = poolSize > 0 ?
+            new OptimizedObjectPooledStreamRegistry(poolSize) :
+            new OptimizedObjectSharedStreamRegistry();
 
         return this;
     }
@@ -174,7 +179,7 @@ public class OptimizedMarshaller extends AbstractNodeNameAwareMarshaller {
         OptimizedObjectOutputStream objOut = null;
 
         try {
-            objOut = OptimizedObjectStreamRegistry.out();
+            objOut = registry.out();
 
             objOut.context(clsMap, ctx, mapper, requireSer);
 
@@ -186,7 +191,7 @@ public class OptimizedMarshaller extends AbstractNodeNameAwareMarshaller {
             throw new IgniteCheckedException("Failed to serialize object: " + obj, e);
         }
         finally {
-            OptimizedObjectStreamRegistry.closeOut(objOut);
+            registry.closeOut(objOut);
         }
     }
 
@@ -195,7 +200,7 @@ public class OptimizedMarshaller extends AbstractNodeNameAwareMarshaller {
         OptimizedObjectOutputStream objOut = null;
 
         try {
-            objOut = OptimizedObjectStreamRegistry.out();
+            objOut = registry.out();
 
             objOut.context(clsMap, ctx, mapper, requireSer);
 
@@ -207,7 +212,7 @@ public class OptimizedMarshaller extends AbstractNodeNameAwareMarshaller {
             throw new IgniteCheckedException("Failed to serialize object: " + obj, e);
         }
         finally {
-            OptimizedObjectStreamRegistry.closeOut(objOut);
+            registry.closeOut(objOut);
         }
     }
 
@@ -218,7 +223,7 @@ public class OptimizedMarshaller extends AbstractNodeNameAwareMarshaller {
         OptimizedObjectInputStream objIn = null;
 
         try {
-            objIn = OptimizedObjectStreamRegistry.in();
+            objIn = registry.in();
 
             objIn.context(clsMap, ctx, mapper, clsLdr != null ? clsLdr : dfltClsLdr);
 
@@ -237,7 +242,7 @@ public class OptimizedMarshaller extends AbstractNodeNameAwareMarshaller {
                 "[clsLdr=" + clsLdr + ", err=" + e.getMessage() + "]", e);
         }
         finally {
-            OptimizedObjectStreamRegistry.closeIn(objIn);
+            registry.closeIn(objIn);
         }
     }
 
@@ -248,7 +253,7 @@ public class OptimizedMarshaller extends AbstractNodeNameAwareMarshaller {
         OptimizedObjectInputStream objIn = null;
 
         try {
-            objIn = OptimizedObjectStreamRegistry.in();
+            objIn = registry.in();
 
             objIn.context(clsMap, ctx, mapper, clsLdr != null ? clsLdr : dfltClsLdr);
 
@@ -265,7 +270,7 @@ public class OptimizedMarshaller extends AbstractNodeNameAwareMarshaller {
             throw new IgniteCheckedException("Failed to deserialize object with given class loader: " + clsLdr, e);
         }
         finally {
-            OptimizedObjectStreamRegistry.closeIn(objIn);
+            registry.closeIn(objIn);
         }
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedObjectPooledStreamRegistry.java b/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedObjectPooledStreamRegistry.java
new file mode 100644
index 0000000..319ca5a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedObjectPooledStreamRegistry.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.marshaller.optimized;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *
+ */
+public class OptimizedObjectPooledStreamRegistry extends OptimizedObjectStreamRegistry{
+    /** Output streams pool. */
+    private final BlockingQueue<OptimizedObjectOutputStream> outPool;
+
+    /** Input streams pool. */
+    private final BlockingQueue<OptimizedObjectInputStream> inPool;
+
+    /**
+     * @param size Pool size.
+     */
+    OptimizedObjectPooledStreamRegistry(int size) {
+        assert size > 0 : "size must be positive for pooled stream registry: " + size;
+
+        outPool = new LinkedBlockingQueue<>(size);
+        inPool = new LinkedBlockingQueue<>(size);
+
+        for (int i = 0; i < size; i++) {
+            outPool.offer(createOut());
+            inPool.offer(createIn());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override OptimizedObjectOutputStream out() throws IgniteInterruptedCheckedException {
+        try {
+            return outPool.take();
+        }
+        catch (InterruptedException e) {
+            throw new IgniteInterruptedCheckedException(
+                "Failed to take output object stream from pool (thread interrupted).", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override OptimizedObjectInputStream in() throws IgniteInterruptedCheckedException {
+        try {
+            return inPool.take();
+        }
+        catch (InterruptedException e) {
+            throw new IgniteInterruptedCheckedException(
+                "Failed to take input object stream from pool (thread interrupted).", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override void closeOut(OptimizedObjectOutputStream out) {
+        U.close(out, null);
+
+        boolean b = outPool.offer(out);
+
+        assert b;
+    }
+
+    /** {@inheritDoc} */
+    @Override void closeIn(OptimizedObjectInputStream in) {
+        U.close(in, null);
+
+        boolean b = inPool.offer(in);
+
+        assert b;
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedObjectSharedStreamRegistry.java b/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedObjectSharedStreamRegistry.java
new file mode 100644
index 0000000..ac28e40
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedObjectSharedStreamRegistry.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.marshaller.optimized;
+
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *
+ */
+public class OptimizedObjectSharedStreamRegistry extends OptimizedObjectStreamRegistry {
+    /** */
+    private static final ThreadLocal<StreamHolder> holders = new ThreadLocal<>();
+
+    /** {@inheritDoc} */
+    @Override OptimizedObjectOutputStream out() {
+        return holder().acquireOut();
+    }
+
+    /** {@inheritDoc} */
+    @Override OptimizedObjectInputStream in() {
+        return holder().acquireIn();
+    }
+
+    /** {@inheritDoc} */
+    @Override void closeOut(OptimizedObjectOutputStream out) {
+        U.close(out, null);
+
+        StreamHolder holder = holders.get();
+
+        if (holder != null)
+            holder.releaseOut();
+    }
+
+    /** {@inheritDoc} */
+    @Override void closeIn(OptimizedObjectInputStream in) {
+        U.close(in, null);
+
+        StreamHolder holder = holders.get();
+
+        if (holder != null)
+            holder.releaseIn();
+    }
+
+    /**
+     * Gets holder from pool or thread local.
+     *
+     * @return Stream holder.
+     */
+    private static StreamHolder holder() {
+        StreamHolder holder = holders.get();
+
+        if (holder == null)
+            holders.set(holder = new StreamHolder());
+
+        return holder;
+    }
+
+    /**
+     * Streams holder.
+     */
+    private static class StreamHolder {
+        /** Output stream. */
+        private final OptimizedObjectOutputStream out = createOut();
+
+        /** Input stream. */
+        private final OptimizedObjectInputStream in = createIn();
+
+        /** Output streams counter. */
+        private int outAcquireCnt;
+
+        /** Input streams counter. */
+        private int inAcquireCnt;
+
+        /**
+         * Gets output stream.
+         *
+         * @return Object output stream.
+         */
+        OptimizedObjectOutputStream acquireOut() {
+            return outAcquireCnt++ > 0 ? createOut() : out;
+        }
+
+        /**
+         * Gets input stream.
+         *
+         * @return Object input stream.
+         */
+        OptimizedObjectInputStream acquireIn() {
+            return inAcquireCnt++ > 0 ? createIn() : in;
+        }
+
+        /**
+         * Releases output stream.
+         */
+        void releaseOut() {
+            outAcquireCnt--;
+        }
+
+        /**
+         * Releases input stream.
+         */
+        void releaseIn() {
+            inAcquireCnt--;
+        }
+    }
+
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedObjectStreamRegistry.java b/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedObjectStreamRegistry.java
index f022973..158cb05 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedObjectStreamRegistry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedObjectStreamRegistry.java
@@ -18,74 +18,22 @@
 package org.apache.ignite.internal.marshaller.optimized;
 
 import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.util.io.GridUnsafeDataInput;
 import org.apache.ignite.internal.util.io.GridUnsafeDataOutput;
-import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
  * Storage for object streams.
  */
-class OptimizedObjectStreamRegistry {
-    /** Holders. */
-    private static final ThreadLocal<StreamHolder> holders = new ThreadLocal<>();
-
-    /** Output streams pool. */
-    private static BlockingQueue<OptimizedObjectOutputStream> outPool;
-
-    /** Input streams pool. */
-    private static BlockingQueue<OptimizedObjectInputStream> inPool;
-
-    /**
-     * Ensures singleton.
-     */
-    private OptimizedObjectStreamRegistry() {
-        // No-op.
-    }
-
-    /**
-     * Sets streams pool size.
-     *
-     * @param size Streams pool size.
-     */
-    static void poolSize(int size) {
-        if (size > 0) {
-            outPool = new LinkedBlockingQueue<>(size);
-            inPool = new LinkedBlockingQueue<>(size);
-
-            for (int i = 0; i < size; i++) {
-                outPool.offer(createOut());
-                inPool.offer(createIn());
-            }
-        }
-        else {
-            outPool = null;
-            inPool = null;
-        }
-    }
-
+abstract class OptimizedObjectStreamRegistry {
     /**
      * Gets output stream.
      *
      * @return Object output stream.
      * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If thread is interrupted while trying to take holder from pool.
      */
-    static OptimizedObjectOutputStream out() throws IgniteInterruptedCheckedException {
-        if (outPool != null) {
-            try {
-                return outPool.take();
-            }
-            catch (InterruptedException e) {
-                throw new IgniteInterruptedCheckedException(
-                    "Failed to take output object stream from pool (thread interrupted).", e);
-            }
-        }
-        else
-            return holder().acquireOut();
-    }
+    abstract OptimizedObjectOutputStream out() throws IgniteInterruptedCheckedException;
 
     /**
      * Gets input stream.
@@ -93,83 +41,28 @@ class OptimizedObjectStreamRegistry {
      * @return Object input stream.
      * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If thread is interrupted while trying to take holder from pool.
      */
-    static OptimizedObjectInputStream in() throws IgniteInterruptedCheckedException {
-        if (inPool != null) {
-            try {
-                return inPool.take();
-            }
-            catch (InterruptedException e) {
-                throw new IgniteInterruptedCheckedException(
-                    "Failed to take input object stream from pool (thread interrupted).", e);
-            }
-        }
-        else
-            return holder().acquireIn();
-    }
+    abstract OptimizedObjectInputStream in() throws IgniteInterruptedCheckedException;
 
     /**
      * Closes and releases output stream.
      *
      * @param out Object output stream.
      */
-    static void closeOut(OptimizedObjectOutputStream out) {
-        U.close(out, null);
-
-        if (outPool != null) {
-            boolean b = outPool.offer(out);
-
-            assert b;
-        }
-        else {
-            StreamHolder holder = holders.get();
-
-            if (holder != null)
-                holder.releaseOut();
-        }
-    }
+    abstract void closeOut(OptimizedObjectOutputStream out);
 
     /**
      * Closes and releases input stream.
      *
      * @param in Object input stream.
      */
-    static void closeIn(OptimizedObjectInputStream in) {
-        U.close(in, null);
-
-        if (inPool != null) {
-            boolean b = inPool.offer(in);
-
-            assert b;
-        }
-        else {
-            StreamHolder holder = holders.get();
-
-            if (holder != null)
-                holder.releaseIn();
-        }
-    }
-
-    /**
-     * Gets holder from pool or thread local.
-     *
-     * @return Stream holder.
-     * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If thread is interrupted while trying to take holder from pool.
-     */
-    private static StreamHolder holder() throws IgniteInterruptedCheckedException {
-        StreamHolder holder = holders.get();
-
-        if (holder == null)
-            holders.set(holder = new StreamHolder());
-
-        return holder;
-    }
+    abstract void closeIn(OptimizedObjectInputStream in);
 
     /**
      * Creates output stream.
      *
      * @return Object output stream.
      */
-    private static OptimizedObjectOutputStream createOut() {
+    static OptimizedObjectOutputStream createOut() {
         try {
             return new OptimizedObjectOutputStream(new GridUnsafeDataOutput(4 * 1024));
         }
@@ -183,7 +76,7 @@ class OptimizedObjectStreamRegistry {
      *
      * @return Object input stream.
      */
-    private static OptimizedObjectInputStream createIn() {
+    static OptimizedObjectInputStream createIn() {
         try {
             return new OptimizedObjectInputStream(new GridUnsafeDataInput());
         }
@@ -191,53 +84,4 @@ class OptimizedObjectStreamRegistry {
             throw new IgniteException("Failed to create object input stream.", e);
         }
     }
-
-    /**
-     * Streams holder.
-     */
-    private static class StreamHolder {
-        /** Output stream. */
-        private final OptimizedObjectOutputStream out = createOut();
-
-        /** Input stream. */
-        private final OptimizedObjectInputStream in = createIn();
-
-        /** Output streams counter. */
-        private int outAcquireCnt;
-
-        /** Input streams counter. */
-        private int inAcquireCnt;
-
-        /**
-         * Gets output stream.
-         *
-         * @return Object output stream.
-         */
-        OptimizedObjectOutputStream acquireOut() {
-            return outAcquireCnt++ > 0 ? createOut() : out;
-        }
-
-        /**
-         * Gets input stream.
-         *
-         * @return Object input stream.
-         */
-        OptimizedObjectInputStream acquireIn() {
-            return inAcquireCnt++ > 0 ? createIn() : in;
-        }
-
-        /**
-         * Releases output stream.
-         */
-        void releaseOut() {
-            outAcquireCnt--;
-        }
-
-        /**
-         * Releases input stream.
-         */
-        void releaseIn() {
-            inAcquireCnt--;
-        }
-    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/marshaller/optimized/OptimizedObjectStreamSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/marshaller/optimized/OptimizedObjectStreamSelfTest.java
index 4b48a22..6a8dc50 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/marshaller/optimized/OptimizedObjectStreamSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/marshaller/optimized/OptimizedObjectStreamSelfTest.java
@@ -1026,7 +1026,9 @@ public class OptimizedObjectStreamSelfTest extends GridCommonAbstractTest {
      */
     @Test
     public void testReadToArray() throws Exception {
-        OptimizedObjectInputStream in = OptimizedObjectStreamRegistry.in();
+        OptimizedObjectStreamRegistry reg = new OptimizedObjectSharedStreamRegistry();
+
+        OptimizedObjectInputStream in = reg.in();
 
         try {
             byte[] arr = new byte[50];
@@ -1065,7 +1067,7 @@ public class OptimizedObjectStreamSelfTest extends GridCommonAbstractTest {
                 assertEquals(i < 10 ? 40 + i : 0, buf[i]);
         }
         finally {
-            OptimizedObjectStreamRegistry.closeIn(in);
+            reg.closeIn(in);
         }
     }
 
@@ -1178,8 +1180,10 @@ public class OptimizedObjectStreamSelfTest extends GridCommonAbstractTest {
         OptimizedObjectOutputStream out = null;
         OptimizedObjectInputStream in = null;
 
+        OptimizedObjectStreamRegistry reg = new OptimizedObjectSharedStreamRegistry();
+
         try {
-            out = OptimizedObjectStreamRegistry.out();
+            out = reg.out();
 
             out.context(clsMap, CTX, null, true);
 
@@ -1187,7 +1191,7 @@ public class OptimizedObjectStreamSelfTest extends GridCommonAbstractTest {
 
             byte[] arr = out.out().array();
 
-            in = OptimizedObjectStreamRegistry.in();
+            in = reg.in();
 
             in.context(clsMap, CTX, null, getClass().getClassLoader());
 
@@ -1200,8 +1204,8 @@ public class OptimizedObjectStreamSelfTest extends GridCommonAbstractTest {
             return (T)obj0;
         }
         finally {
-            OptimizedObjectStreamRegistry.closeOut(out);
-            OptimizedObjectStreamRegistry.closeIn(in);
+            reg.closeOut(out);
+            reg.closeIn(in);
         }
     }
 
@@ -1210,10 +1214,8 @@ public class OptimizedObjectStreamSelfTest extends GridCommonAbstractTest {
      *
      * @param out Output stream.
      * @param in Input stream.
-     * @throws Exception If failed.
      */
-    private void checkHandles(OptimizedObjectOutputStream out, OptimizedObjectInputStream in)
-        throws Exception {
+    private void checkHandles(OptimizedObjectOutputStream out, OptimizedObjectInputStream in) {
         Object[] outHandles = out.handledObjects();
         Object[] inHandles = in.handledObjects();
 
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteReproducingSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteReproducingSuite.java
index 5bc41f4..3eb911a 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteReproducingSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteReproducingSuite.java
@@ -19,7 +19,7 @@ package org.apache.ignite.testsuites;
 
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.ignite.cache.RemoveAllDeadlockTest;
+import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshallerPooledSelfTest;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -49,8 +49,8 @@ public class IgniteReproducingSuite {
             suite.add(IgniteReproducingSuite.TestStub.class);
 
             //uncomment to add some test
-            for (int i = 0; i < 50; i++)
-                suite.add(RemoveAllDeadlockTest.class);
+            for (int i = 0; i < 500; i++)
+                suite.add(OptimizedMarshallerPooledSelfTest.class);
 
             return suite;
         }