You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/06 14:39:48 UTC

[26/50] [abbrv] ignite git commit: Fixed NPE in OptimizedMarshaller when streams pool is used

Fixed NPE in OptimizedMarshaller when streams pool is used


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/27f6c586
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/27f6c586
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/27f6c586

Branch: refs/heads/ignite-1.5.31-1
Commit: 27f6c5862940948669b6445a6839501327dacbb2
Parents: 4d2be72
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Wed Jul 13 14:37:23 2016 -0700
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Wed Jul 13 14:37:23 2016 -0700

----------------------------------------------------------------------
 .../OptimizedObjectStreamRegistry.java          | 145 +++++++++++--------
 .../OptimizedMarshallerPooledSelfTest.java      |  44 ++++++
 .../testsuites/IgniteBinaryBasicTestSuite.java  |   2 +
 .../IgniteMarshallerSelfTestSuite.java          |   2 +
 4 files changed, 130 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/27f6c586/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectStreamRegistry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectStreamRegistry.java b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectStreamRegistry.java
index e0e4872..fd1b917 100644
--- a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectStreamRegistry.java
+++ b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectStreamRegistry.java
@@ -33,8 +33,11 @@ class OptimizedObjectStreamRegistry {
     /** Holders. */
     private static final ThreadLocal<StreamHolder> holders = new ThreadLocal<>();
 
-    /** Holders pool. */
-    private static BlockingQueue<StreamHolder> pool;
+    /** Output streams pool. */
+    private static BlockingQueue<OptimizedObjectOutputStream> outPool;
+
+    /** Input streams pool. */
+    private static BlockingQueue<OptimizedObjectInputStream> inPool;
 
     /**
      * Ensures singleton.
@@ -50,16 +53,18 @@ class OptimizedObjectStreamRegistry {
      */
     static void poolSize(int size) {
         if (size > 0) {
-            pool = new LinkedBlockingQueue<>(size);
+            outPool = new LinkedBlockingQueue<>(size);
+            inPool = new LinkedBlockingQueue<>(size);
 
             for (int i = 0; i < size; i++) {
-                boolean b = pool.offer(new StreamHolder());
-
-                assert b;
+                outPool.offer(createOut());
+                inPool.offer(createIn());
             }
         }
-        else
-            pool = null;
+        else {
+            outPool = null;
+            inPool = null;
+        }
     }
 
     /**
@@ -69,7 +74,17 @@ class OptimizedObjectStreamRegistry {
      * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If thread is interrupted while trying to take holder from pool.
      */
     static OptimizedObjectOutputStream out() throws IgniteInterruptedCheckedException {
-        return holder().acquireOut();
+        if (outPool != null) {
+            try {
+                return outPool.take();
+            }
+            catch (InterruptedException e) {
+                throw new IgniteInterruptedCheckedException(
+                    "Failed to take output object stream from pool (thread interrupted).", e);
+            }
+        }
+        else
+            return holder().acquireOut();
     }
 
     /**
@@ -79,7 +94,17 @@ class OptimizedObjectStreamRegistry {
      * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If thread is interrupted while trying to take holder from pool.
      */
     static OptimizedObjectInputStream in() throws IgniteInterruptedCheckedException {
-        return holder().acquireIn();
+        if (inPool != null) {
+            try {
+                return inPool.take();
+            }
+            catch (InterruptedException e) {
+                throw new IgniteInterruptedCheckedException(
+                    "Failed to take input object stream from pool (thread interrupted).", e);
+            }
+        }
+        else
+            return holder().acquireIn();
     }
 
     /**
@@ -90,17 +115,17 @@ class OptimizedObjectStreamRegistry {
     static void closeOut(OptimizedObjectOutputStream out) {
         U.close(out, null);
 
-        StreamHolder holder = holders.get();
-
-        holder.releaseOut();
-
-        if (pool != null) {
-            holders.remove();
-
-            boolean b = pool.offer(holder);
+        if (outPool != null) {
+            boolean b = outPool.offer(out);
 
             assert b;
         }
+        else {
+            StreamHolder holder = holders.get();
+
+            if (holder != null)
+                holder.releaseOut();
+        }
     }
 
     /**
@@ -112,17 +137,17 @@ class OptimizedObjectStreamRegistry {
     static void closeIn(OptimizedObjectInputStream in) {
         U.close(in, null);
 
-        StreamHolder holder = holders.get();
-
-        holder.releaseIn();
-
-        if (pool != null) {
-            holders.remove();
-
-            boolean b = pool.offer(holder);
+        if (inPool != null) {
+            boolean b = inPool.offer(in);
 
             assert b;
         }
+        else {
+            StreamHolder holder = holders.get();
+
+            if (holder != null)
+                holder.releaseIn();
+        }
     }
 
     /**
@@ -134,19 +159,41 @@ class OptimizedObjectStreamRegistry {
     private static StreamHolder holder() throws IgniteInterruptedCheckedException {
         StreamHolder holder = holders.get();
 
-        if (holder == null) {
-            try {
-                holders.set(holder = pool != null ? pool.take() : new StreamHolder());
-            }
-            catch (InterruptedException e) {
-                throw new IgniteInterruptedCheckedException("Failed to take object stream from pool (thread interrupted).", e);
-            }
-        }
+        if (holder == null)
+            holders.set(holder = new StreamHolder());
 
         return holder;
     }
 
     /**
+     * Creates output stream.
+     *
+     * @return Object output stream.
+     */
+    private static OptimizedObjectOutputStream createOut() {
+        try {
+            return new OptimizedObjectOutputStream(new GridUnsafeDataOutput(4 * 1024));
+        }
+        catch (IOException e) {
+            throw new IgniteException("Failed to create object output stream.", e);
+        }
+    }
+
+    /**
+     * Creates input stream.
+     *
+     * @return Object input stream.
+     */
+    private static OptimizedObjectInputStream createIn() {
+        try {
+            return new OptimizedObjectInputStream(new GridUnsafeDataInput());
+        }
+        catch (IOException e) {
+            throw new IgniteException("Failed to create object input stream.", e);
+        }
+    }
+
+    /**
      * Streams holder.
      */
     private static class StreamHolder {
@@ -193,33 +240,5 @@ class OptimizedObjectStreamRegistry {
         void releaseIn() {
             inAcquireCnt--;
         }
-
-        /**
-         * Creates output stream.
-         *
-         * @return Object output stream.
-         */
-        private OptimizedObjectOutputStream createOut() {
-            try {
-                return new OptimizedObjectOutputStream(new GridUnsafeDataOutput(4 * 1024));
-            }
-            catch (IOException e) {
-                throw new IgniteException("Failed to create object output stream.", e);
-            }
-        }
-
-        /**
-         * Creates input stream.
-         *
-         * @return Object input stream.
-         */
-        private OptimizedObjectInputStream createIn() {
-            try {
-                return new OptimizedObjectInputStream(new GridUnsafeDataInput());
-            }
-            catch (IOException e) {
-                throw new IgniteException("Failed to create object input stream.", e);
-            }
-        }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/27f6c586/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerPooledSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerPooledSelfTest.java b/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerPooledSelfTest.java
new file mode 100644
index 0000000..a883270
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerPooledSelfTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.marshaller.optimized;
+
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.testframework.junits.common.GridCommonTest;
+
+/**
+ * Optimized marshaller self test.
+ */
+@GridCommonTest(group = "Marshaller")
+public class OptimizedMarshallerPooledSelfTest extends OptimizedMarshallerSelfTest {
+    /** {@inheritDoc} */
+    @Override protected Marshaller marshaller() {
+        OptimizedMarshaller m = new OptimizedMarshaller(false);
+
+        m.setPoolSize(8);
+
+        return m;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        // Reset static registry.
+        new OptimizedMarshaller().setPoolSize(0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/27f6c586/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryBasicTestSuite.java
index 2aabf4f..734e199 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryBasicTestSuite.java
@@ -40,6 +40,7 @@ import org.apache.ignite.marshaller.DynamicProxySerializationMultiJvmSelfTest;
 import org.apache.ignite.marshaller.jdk.GridJdkMarshallerSelfTest;
 import org.apache.ignite.marshaller.optimized.OptimizedMarshallerEnumSelfTest;
 import org.apache.ignite.marshaller.optimized.OptimizedMarshallerNodeFailoverTest;
+import org.apache.ignite.marshaller.optimized.OptimizedMarshallerPooledSelfTest;
 import org.apache.ignite.marshaller.optimized.OptimizedMarshallerSelfTest;
 import org.apache.ignite.marshaller.optimized.OptimizedMarshallerSerialPersistentFieldsSelfTest;
 import org.apache.ignite.marshaller.optimized.OptimizedMarshallerTest;
@@ -88,6 +89,7 @@ public class IgniteBinaryBasicTestSuite extends TestSuite {
         ignoredTests.add(GridDeploymentMessageCountSelfTest.class);
         ignoredTests.add(DynamicProxySerializationMultiJvmSelfTest.class);
         ignoredTests.add(GridHandleTableSelfTest.class);
+        ignoredTests.add(OptimizedMarshallerPooledSelfTest.class);
 
         // TODO: check and delete if pass.
         ignoredTests.add(IgniteDaemonNodeMarshallerCacheTest.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/27f6c586/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteMarshallerSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteMarshallerSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteMarshallerSelfTestSuite.java
index ec0ec23..99cbf60 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteMarshallerSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteMarshallerSelfTestSuite.java
@@ -24,6 +24,7 @@ import org.apache.ignite.internal.util.io.GridUnsafeDataOutputArraySizingSelfTes
 import org.apache.ignite.marshaller.jdk.GridJdkMarshallerSelfTest;
 import org.apache.ignite.marshaller.optimized.OptimizedMarshallerEnumSelfTest;
 import org.apache.ignite.marshaller.optimized.OptimizedMarshallerNodeFailoverTest;
+import org.apache.ignite.marshaller.optimized.OptimizedMarshallerPooledSelfTest;
 import org.apache.ignite.marshaller.optimized.OptimizedMarshallerSelfTest;
 import org.apache.ignite.marshaller.optimized.OptimizedMarshallerSerialPersistentFieldsSelfTest;
 import org.apache.ignite.marshaller.optimized.OptimizedMarshallerTest;
@@ -59,6 +60,7 @@ public class IgniteMarshallerSelfTestSuite extends TestSuite {
         GridTestUtils.addTestIfNeeded(suite, OptimizedMarshallerNodeFailoverTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, OptimizedMarshallerSerialPersistentFieldsSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, GridHandleTableSelfTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, OptimizedMarshallerPooledSelfTest.class, ignoredTests);
 
         return suite;
     }