You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/02/26 14:16:40 UTC

[22/35] ignite git commit: Fixed assertion in optimized marshaller

Fixed assertion in optimized marshaller


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ff80f5da
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ff80f5da
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ff80f5da

Branch: refs/heads/ignite-2407
Commit: ff80f5da911b4fedc5407d6cc22fcea5e4dd34bc
Parents: 9117720
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Wed Feb 24 18:39:26 2016 -0800
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Wed Feb 24 18:39:26 2016 -0800

----------------------------------------------------------------------
 .../OptimizedObjectStreamRegistry.java          | 147 +++++++++++--------
 .../OptimizedMarshallerPooledSelfTest.java      |   2 +-
 2 files changed, 86 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ff80f5da/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectStreamRegistry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectStreamRegistry.java b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectStreamRegistry.java
index cf92d27..fd1b917 100644
--- a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectStreamRegistry.java
+++ b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectStreamRegistry.java
@@ -33,8 +33,11 @@ class OptimizedObjectStreamRegistry {
     /** Holders. */
     private static final ThreadLocal<StreamHolder> holders = new ThreadLocal<>();
 
-    /** Holders pool. */
-    private static BlockingQueue<StreamHolder> pool;
+    /** Output streams pool. */
+    private static BlockingQueue<OptimizedObjectOutputStream> outPool;
+
+    /** Input streams pool. */
+    private static BlockingQueue<OptimizedObjectInputStream> inPool;
 
     /**
      * Ensures singleton.
@@ -50,16 +53,18 @@ class OptimizedObjectStreamRegistry {
      */
     static void poolSize(int size) {
         if (size > 0) {
-            pool = new LinkedBlockingQueue<>(size);
+            outPool = new LinkedBlockingQueue<>(size);
+            inPool = new LinkedBlockingQueue<>(size);
 
             for (int i = 0; i < size; i++) {
-                boolean b = pool.offer(new StreamHolder());
-
-                assert b;
+                outPool.offer(createOut());
+                inPool.offer(createIn());
             }
         }
-        else
-            pool = null;
+        else {
+            outPool = null;
+            inPool = null;
+        }
     }
 
     /**
@@ -69,7 +74,17 @@ class OptimizedObjectStreamRegistry {
      * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If thread is interrupted while trying to take holder from pool.
      */
     static OptimizedObjectOutputStream out() throws IgniteInterruptedCheckedException {
-        return holder().acquireOut();
+        if (outPool != null) {
+            try {
+                return outPool.take();
+            }
+            catch (InterruptedException e) {
+                throw new IgniteInterruptedCheckedException(
+                    "Failed to take output object stream from pool (thread interrupted).", e);
+            }
+        }
+        else
+            return holder().acquireOut();
     }
 
     /**
@@ -79,7 +94,17 @@ class OptimizedObjectStreamRegistry {
      * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If thread is interrupted while trying to take holder from pool.
      */
     static OptimizedObjectInputStream in() throws IgniteInterruptedCheckedException {
-        return holder().acquireIn();
+        if (inPool != null) {
+            try {
+                return inPool.take();
+            }
+            catch (InterruptedException e) {
+                throw new IgniteInterruptedCheckedException(
+                    "Failed to take input object stream from pool (thread interrupted).", e);
+            }
+        }
+        else
+            return holder().acquireIn();
     }
 
     /**
@@ -90,15 +115,17 @@ class OptimizedObjectStreamRegistry {
     static void closeOut(OptimizedObjectOutputStream out) {
         U.close(out, null);
 
-        StreamHolder holder = holders.get();
-
-        if (holder != null && holder.releaseOut() && pool != null) {
-            holders.set(null);
-
-            boolean b = pool.offer(holder);
+        if (outPool != null) {
+            boolean b = outPool.offer(out);
 
             assert b;
         }
+        else {
+            StreamHolder holder = holders.get();
+
+            if (holder != null)
+                holder.releaseOut();
+        }
     }
 
     /**
@@ -110,15 +137,17 @@ class OptimizedObjectStreamRegistry {
     static void closeIn(OptimizedObjectInputStream in) {
         U.close(in, null);
 
-        StreamHolder holder = holders.get();
-
-        if (holder != null && holder.releaseIn() && pool != null) {
-            holders.set(null);
-
-            boolean b = pool.offer(holder);
+        if (inPool != null) {
+            boolean b = inPool.offer(in);
 
             assert b;
         }
+        else {
+            StreamHolder holder = holders.get();
+
+            if (holder != null)
+                holder.releaseIn();
+        }
     }
 
     /**
@@ -130,19 +159,41 @@ class OptimizedObjectStreamRegistry {
     private static StreamHolder holder() throws IgniteInterruptedCheckedException {
         StreamHolder holder = holders.get();
 
-        if (holder == null) {
-            try {
-                holders.set(holder = pool != null ? pool.take() : new StreamHolder());
-            }
-            catch (InterruptedException e) {
-                throw new IgniteInterruptedCheckedException("Failed to take object stream from pool (thread interrupted).", e);
-            }
-        }
+        if (holder == null)
+            holders.set(holder = new StreamHolder());
 
         return holder;
     }
 
     /**
+     * Creates output stream.
+     *
+     * @return Object output stream.
+     */
+    private static OptimizedObjectOutputStream createOut() {
+        try {
+            return new OptimizedObjectOutputStream(new GridUnsafeDataOutput(4 * 1024));
+        }
+        catch (IOException e) {
+            throw new IgniteException("Failed to create object output stream.", e);
+        }
+    }
+
+    /**
+     * Creates input stream.
+     *
+     * @return Object input stream.
+     */
+    private static OptimizedObjectInputStream createIn() {
+        try {
+            return new OptimizedObjectInputStream(new GridUnsafeDataInput());
+        }
+        catch (IOException e) {
+            throw new IgniteException("Failed to create object input stream.", e);
+        }
+    }
+
+    /**
      * Streams holder.
      */
     private static class StreamHolder {
@@ -179,43 +230,15 @@ class OptimizedObjectStreamRegistry {
         /**
          * Releases output stream.
          */
-        boolean releaseOut() {
-            return --outAcquireCnt == 0;
+        void releaseOut() {
+            outAcquireCnt--;
         }
 
         /**
          * Releases input stream.
          */
-        boolean releaseIn() {
-            return --inAcquireCnt == 0;
-        }
-
-        /**
-         * Creates output stream.
-         *
-         * @return Object output stream.
-         */
-        private OptimizedObjectOutputStream createOut() {
-            try {
-                return new OptimizedObjectOutputStream(new GridUnsafeDataOutput(4 * 1024));
-            }
-            catch (IOException e) {
-                throw new IgniteException("Failed to create object output stream.", e);
-            }
-        }
-
-        /**
-         * Creates input stream.
-         *
-         * @return Object input stream.
-         */
-        private OptimizedObjectInputStream createIn() {
-            try {
-                return new OptimizedObjectInputStream(new GridUnsafeDataInput());
-            }
-            catch (IOException e) {
-                throw new IgniteException("Failed to create object input stream.", e);
-            }
+        void releaseIn() {
+            inAcquireCnt--;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ff80f5da/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerPooledSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerPooledSelfTest.java b/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerPooledSelfTest.java
index c649787..a883270 100644
--- a/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerPooledSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerPooledSelfTest.java
@@ -39,6 +39,6 @@ public class OptimizedMarshallerPooledSelfTest extends OptimizedMarshallerSelfTe
         super.afterTestsStopped();
 
         // Reset static registry.
-        new OptimizedMarshaller().setPoolSize(-1);
+        new OptimizedMarshaller().setPoolSize(0);
     }
 }