You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/02/26 13:12:34 UTC
[06/17] ignite git commit: Fixed assertion in optimized marshaller
Fixed assertion in optimized marshaller
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ff80f5da
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ff80f5da
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ff80f5da
Branch: refs/heads/ignite-1232
Commit: ff80f5da911b4fedc5407d6cc22fcea5e4dd34bc
Parents: 9117720
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Wed Feb 24 18:39:26 2016 -0800
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Wed Feb 24 18:39:26 2016 -0800
----------------------------------------------------------------------
.../OptimizedObjectStreamRegistry.java | 147 +++++++++++--------
.../OptimizedMarshallerPooledSelfTest.java | 2 +-
2 files changed, 86 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ff80f5da/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectStreamRegistry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectStreamRegistry.java b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectStreamRegistry.java
index cf92d27..fd1b917 100644
--- a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectStreamRegistry.java
+++ b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedObjectStreamRegistry.java
@@ -33,8 +33,11 @@ class OptimizedObjectStreamRegistry {
/** Holders. */
private static final ThreadLocal<StreamHolder> holders = new ThreadLocal<>();
- /** Holders pool. */
- private static BlockingQueue<StreamHolder> pool;
+ /** Output streams pool. */
+ private static BlockingQueue<OptimizedObjectOutputStream> outPool;
+
+ /** Input streams pool. */
+ private static BlockingQueue<OptimizedObjectInputStream> inPool;
/**
* Ensures singleton.
@@ -50,16 +53,18 @@ class OptimizedObjectStreamRegistry {
*/
static void poolSize(int size) {
if (size > 0) {
- pool = new LinkedBlockingQueue<>(size);
+ outPool = new LinkedBlockingQueue<>(size);
+ inPool = new LinkedBlockingQueue<>(size);
for (int i = 0; i < size; i++) {
- boolean b = pool.offer(new StreamHolder());
-
- assert b;
+ outPool.offer(createOut());
+ inPool.offer(createIn());
}
}
- else
- pool = null;
+ else {
+ outPool = null;
+ inPool = null;
+ }
}
/**
@@ -69,7 +74,17 @@ class OptimizedObjectStreamRegistry {
* @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If thread is interrupted while trying to take holder from pool.
*/
static OptimizedObjectOutputStream out() throws IgniteInterruptedCheckedException {
- return holder().acquireOut();
+ if (outPool != null) {
+ try {
+ return outPool.take();
+ }
+ catch (InterruptedException e) {
+ throw new IgniteInterruptedCheckedException(
+ "Failed to take output object stream from pool (thread interrupted).", e);
+ }
+ }
+ else
+ return holder().acquireOut();
}
/**
@@ -79,7 +94,17 @@ class OptimizedObjectStreamRegistry {
* @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If thread is interrupted while trying to take holder from pool.
*/
static OptimizedObjectInputStream in() throws IgniteInterruptedCheckedException {
- return holder().acquireIn();
+ if (inPool != null) {
+ try {
+ return inPool.take();
+ }
+ catch (InterruptedException e) {
+ throw new IgniteInterruptedCheckedException(
+ "Failed to take input object stream from pool (thread interrupted).", e);
+ }
+ }
+ else
+ return holder().acquireIn();
}
/**
@@ -90,15 +115,17 @@ class OptimizedObjectStreamRegistry {
static void closeOut(OptimizedObjectOutputStream out) {
U.close(out, null);
- StreamHolder holder = holders.get();
-
- if (holder != null && holder.releaseOut() && pool != null) {
- holders.set(null);
-
- boolean b = pool.offer(holder);
+ if (outPool != null) {
+ boolean b = outPool.offer(out);
assert b;
}
+ else {
+ StreamHolder holder = holders.get();
+
+ if (holder != null)
+ holder.releaseOut();
+ }
}
/**
@@ -110,15 +137,17 @@ class OptimizedObjectStreamRegistry {
static void closeIn(OptimizedObjectInputStream in) {
U.close(in, null);
- StreamHolder holder = holders.get();
-
- if (holder != null && holder.releaseIn() && pool != null) {
- holders.set(null);
-
- boolean b = pool.offer(holder);
+ if (inPool != null) {
+ boolean b = inPool.offer(in);
assert b;
}
+ else {
+ StreamHolder holder = holders.get();
+
+ if (holder != null)
+ holder.releaseIn();
+ }
}
/**
@@ -130,19 +159,41 @@ class OptimizedObjectStreamRegistry {
private static StreamHolder holder() throws IgniteInterruptedCheckedException {
StreamHolder holder = holders.get();
- if (holder == null) {
- try {
- holders.set(holder = pool != null ? pool.take() : new StreamHolder());
- }
- catch (InterruptedException e) {
- throw new IgniteInterruptedCheckedException("Failed to take object stream from pool (thread interrupted).", e);
- }
- }
+ if (holder == null)
+ holders.set(holder = new StreamHolder());
return holder;
}
/**
+ * Creates output stream.
+ *
+ * @return Object output stream.
+ */
+ private static OptimizedObjectOutputStream createOut() {
+ try {
+ return new OptimizedObjectOutputStream(new GridUnsafeDataOutput(4 * 1024));
+ }
+ catch (IOException e) {
+ throw new IgniteException("Failed to create object output stream.", e);
+ }
+ }
+
+ /**
+ * Creates input stream.
+ *
+ * @return Object input stream.
+ */
+ private static OptimizedObjectInputStream createIn() {
+ try {
+ return new OptimizedObjectInputStream(new GridUnsafeDataInput());
+ }
+ catch (IOException e) {
+ throw new IgniteException("Failed to create object input stream.", e);
+ }
+ }
+
+ /**
* Streams holder.
*/
private static class StreamHolder {
@@ -179,43 +230,15 @@ class OptimizedObjectStreamRegistry {
/**
* Releases output stream.
*/
- boolean releaseOut() {
- return --outAcquireCnt == 0;
+ void releaseOut() {
+ outAcquireCnt--;
}
/**
* Releases input stream.
*/
- boolean releaseIn() {
- return --inAcquireCnt == 0;
- }
-
- /**
- * Creates output stream.
- *
- * @return Object output stream.
- */
- private OptimizedObjectOutputStream createOut() {
- try {
- return new OptimizedObjectOutputStream(new GridUnsafeDataOutput(4 * 1024));
- }
- catch (IOException e) {
- throw new IgniteException("Failed to create object output stream.", e);
- }
- }
-
- /**
- * Creates input stream.
- *
- * @return Object input stream.
- */
- private OptimizedObjectInputStream createIn() {
- try {
- return new OptimizedObjectInputStream(new GridUnsafeDataInput());
- }
- catch (IOException e) {
- throw new IgniteException("Failed to create object input stream.", e);
- }
+ void releaseIn() {
+ inAcquireCnt--;
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ff80f5da/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerPooledSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerPooledSelfTest.java b/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerPooledSelfTest.java
index c649787..a883270 100644
--- a/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerPooledSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerPooledSelfTest.java
@@ -39,6 +39,6 @@ public class OptimizedMarshallerPooledSelfTest extends OptimizedMarshallerSelfTe
super.afterTestsStopped();
// Reset static registry.
- new OptimizedMarshaller().setPoolSize(-1);
+ new OptimizedMarshaller().setPoolSize(0);
}
}