You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2020/05/19 09:48:43 UTC
[ignite] 01/01: IGNITE-13032 Use non-static stream pool for
optimized marshaller.
This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch ignite-13032
in repository https://gitbox.apache.org/repos/asf/ignite.git
commit 6c2ee3a7f6f1e178d0db7205b3e225a9e1e6bfba
Author: Alexey Goncharuk <al...@gmail.com>
AuthorDate: Wed Apr 15 16:57:59 2020 +0300
IGNITE-13032 Use non-static stream pool for optimized marshaller.
---
.../marshaller/optimized/OptimizedMarshaller.java | 23 +--
.../OptimizedObjectPooledStreamRegistry.java | 89 +++++++++++
.../OptimizedObjectSharedStreamRegistry.java | 122 +++++++++++++++
.../optimized/OptimizedObjectStreamRegistry.java | 170 +--------------------
.../optimized/OptimizedObjectStreamSelfTest.java | 20 +--
.../ignite/testsuites/IgniteReproducingSuite.java | 6 +-
6 files changed, 246 insertions(+), 184 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedMarshaller.java b/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedMarshaller.java
index c77cfa25..f5ae585 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedMarshaller.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedMarshaller.java
@@ -100,6 +100,9 @@ public class OptimizedMarshaller extends AbstractNodeNameAwareMarshaller {
/** Class descriptors by class. */
private final ConcurrentMap<Class, OptimizedClassDescriptor> clsMap = new ConcurrentHashMap<>();
+ /** */
+ private OptimizedObjectStreamRegistry registry = new OptimizedObjectSharedStreamRegistry();
+
/**
* Creates new marshaller will all defaults.
*
@@ -162,7 +165,9 @@ public class OptimizedMarshaller extends AbstractNodeNameAwareMarshaller {
* @return {@code this} for chaining.
*/
public OptimizedMarshaller setPoolSize(int poolSize) {
- OptimizedObjectStreamRegistry.poolSize(poolSize);
+ registry = poolSize > 0 ?
+ new OptimizedObjectPooledStreamRegistry(poolSize) :
+ new OptimizedObjectSharedStreamRegistry();
return this;
}
@@ -174,7 +179,7 @@ public class OptimizedMarshaller extends AbstractNodeNameAwareMarshaller {
OptimizedObjectOutputStream objOut = null;
try {
- objOut = OptimizedObjectStreamRegistry.out();
+ objOut = registry.out();
objOut.context(clsMap, ctx, mapper, requireSer);
@@ -186,7 +191,7 @@ public class OptimizedMarshaller extends AbstractNodeNameAwareMarshaller {
throw new IgniteCheckedException("Failed to serialize object: " + obj, e);
}
finally {
- OptimizedObjectStreamRegistry.closeOut(objOut);
+ registry.closeOut(objOut);
}
}
@@ -195,7 +200,7 @@ public class OptimizedMarshaller extends AbstractNodeNameAwareMarshaller {
OptimizedObjectOutputStream objOut = null;
try {
- objOut = OptimizedObjectStreamRegistry.out();
+ objOut = registry.out();
objOut.context(clsMap, ctx, mapper, requireSer);
@@ -207,7 +212,7 @@ public class OptimizedMarshaller extends AbstractNodeNameAwareMarshaller {
throw new IgniteCheckedException("Failed to serialize object: " + obj, e);
}
finally {
- OptimizedObjectStreamRegistry.closeOut(objOut);
+ registry.closeOut(objOut);
}
}
@@ -218,7 +223,7 @@ public class OptimizedMarshaller extends AbstractNodeNameAwareMarshaller {
OptimizedObjectInputStream objIn = null;
try {
- objIn = OptimizedObjectStreamRegistry.in();
+ objIn = registry.in();
objIn.context(clsMap, ctx, mapper, clsLdr != null ? clsLdr : dfltClsLdr);
@@ -237,7 +242,7 @@ public class OptimizedMarshaller extends AbstractNodeNameAwareMarshaller {
"[clsLdr=" + clsLdr + ", err=" + e.getMessage() + "]", e);
}
finally {
- OptimizedObjectStreamRegistry.closeIn(objIn);
+ registry.closeIn(objIn);
}
}
@@ -248,7 +253,7 @@ public class OptimizedMarshaller extends AbstractNodeNameAwareMarshaller {
OptimizedObjectInputStream objIn = null;
try {
- objIn = OptimizedObjectStreamRegistry.in();
+ objIn = registry.in();
objIn.context(clsMap, ctx, mapper, clsLdr != null ? clsLdr : dfltClsLdr);
@@ -265,7 +270,7 @@ public class OptimizedMarshaller extends AbstractNodeNameAwareMarshaller {
throw new IgniteCheckedException("Failed to deserialize object with given class loader: " + clsLdr, e);
}
finally {
- OptimizedObjectStreamRegistry.closeIn(objIn);
+ registry.closeIn(objIn);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedObjectPooledStreamRegistry.java b/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedObjectPooledStreamRegistry.java
new file mode 100644
index 0000000..319ca5a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedObjectPooledStreamRegistry.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.marshaller.optimized;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *
+ */
+public class OptimizedObjectPooledStreamRegistry extends OptimizedObjectStreamRegistry{
+ /** Output streams pool. */
+ private final BlockingQueue<OptimizedObjectOutputStream> outPool;
+
+ /** Input streams pool. */
+ private final BlockingQueue<OptimizedObjectInputStream> inPool;
+
+ /**
+ * @param size Pool size.
+ */
+ OptimizedObjectPooledStreamRegistry(int size) {
+ assert size > 0 : "size must be positive for pooled stream registry: " + size;
+
+ outPool = new LinkedBlockingQueue<>(size);
+ inPool = new LinkedBlockingQueue<>(size);
+
+ for (int i = 0; i < size; i++) {
+ outPool.offer(createOut());
+ inPool.offer(createIn());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override OptimizedObjectOutputStream out() throws IgniteInterruptedCheckedException {
+ try {
+ return outPool.take();
+ }
+ catch (InterruptedException e) {
+ throw new IgniteInterruptedCheckedException(
+ "Failed to take output object stream from pool (thread interrupted).", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override OptimizedObjectInputStream in() throws IgniteInterruptedCheckedException {
+ try {
+ return inPool.take();
+ }
+ catch (InterruptedException e) {
+ throw new IgniteInterruptedCheckedException(
+ "Failed to take input object stream from pool (thread interrupted).", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override void closeOut(OptimizedObjectOutputStream out) {
+ U.close(out, null);
+
+ boolean b = outPool.offer(out);
+
+ assert b;
+ }
+
+ /** {@inheritDoc} */
+ @Override void closeIn(OptimizedObjectInputStream in) {
+ U.close(in, null);
+
+ boolean b = inPool.offer(in);
+
+ assert b;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedObjectSharedStreamRegistry.java b/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedObjectSharedStreamRegistry.java
new file mode 100644
index 0000000..ac28e40
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedObjectSharedStreamRegistry.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.marshaller.optimized;
+
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *
+ */
+public class OptimizedObjectSharedStreamRegistry extends OptimizedObjectStreamRegistry {
+ /** */
+ private static final ThreadLocal<StreamHolder> holders = new ThreadLocal<>();
+
+ /** {@inheritDoc} */
+ @Override OptimizedObjectOutputStream out() {
+ return holder().acquireOut();
+ }
+
+ /** {@inheritDoc} */
+ @Override OptimizedObjectInputStream in() {
+ return holder().acquireIn();
+ }
+
+ /** {@inheritDoc} */
+ @Override void closeOut(OptimizedObjectOutputStream out) {
+ U.close(out, null);
+
+ StreamHolder holder = holders.get();
+
+ if (holder != null)
+ holder.releaseOut();
+ }
+
+ /** {@inheritDoc} */
+ @Override void closeIn(OptimizedObjectInputStream in) {
+ U.close(in, null);
+
+ StreamHolder holder = holders.get();
+
+ if (holder != null)
+ holder.releaseIn();
+ }
+
+ /**
+ * Gets holder from pool or thread local.
+ *
+ * @return Stream holder.
+ */
+ private static StreamHolder holder() {
+ StreamHolder holder = holders.get();
+
+ if (holder == null)
+ holders.set(holder = new StreamHolder());
+
+ return holder;
+ }
+
+ /**
+ * Streams holder.
+ */
+ private static class StreamHolder {
+ /** Output stream. */
+ private final OptimizedObjectOutputStream out = createOut();
+
+ /** Input stream. */
+ private final OptimizedObjectInputStream in = createIn();
+
+ /** Output streams counter. */
+ private int outAcquireCnt;
+
+ /** Input streams counter. */
+ private int inAcquireCnt;
+
+ /**
+ * Gets output stream.
+ *
+ * @return Object output stream.
+ */
+ OptimizedObjectOutputStream acquireOut() {
+ return outAcquireCnt++ > 0 ? createOut() : out;
+ }
+
+ /**
+ * Gets input stream.
+ *
+ * @return Object input stream.
+ */
+ OptimizedObjectInputStream acquireIn() {
+ return inAcquireCnt++ > 0 ? createIn() : in;
+ }
+
+ /**
+ * Releases output stream.
+ */
+ void releaseOut() {
+ outAcquireCnt--;
+ }
+
+ /**
+ * Releases input stream.
+ */
+ void releaseIn() {
+ inAcquireCnt--;
+ }
+ }
+
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedObjectStreamRegistry.java b/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedObjectStreamRegistry.java
index f022973..158cb05 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedObjectStreamRegistry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/marshaller/optimized/OptimizedObjectStreamRegistry.java
@@ -18,74 +18,22 @@
package org.apache.ignite.internal.marshaller.optimized;
import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.io.GridUnsafeDataInput;
import org.apache.ignite.internal.util.io.GridUnsafeDataOutput;
-import org.apache.ignite.internal.util.typedef.internal.U;
/**
* Storage for object streams.
*/
-class OptimizedObjectStreamRegistry {
- /** Holders. */
- private static final ThreadLocal<StreamHolder> holders = new ThreadLocal<>();
-
- /** Output streams pool. */
- private static BlockingQueue<OptimizedObjectOutputStream> outPool;
-
- /** Input streams pool. */
- private static BlockingQueue<OptimizedObjectInputStream> inPool;
-
- /**
- * Ensures singleton.
- */
- private OptimizedObjectStreamRegistry() {
- // No-op.
- }
-
- /**
- * Sets streams pool size.
- *
- * @param size Streams pool size.
- */
- static void poolSize(int size) {
- if (size > 0) {
- outPool = new LinkedBlockingQueue<>(size);
- inPool = new LinkedBlockingQueue<>(size);
-
- for (int i = 0; i < size; i++) {
- outPool.offer(createOut());
- inPool.offer(createIn());
- }
- }
- else {
- outPool = null;
- inPool = null;
- }
- }
-
+abstract class OptimizedObjectStreamRegistry {
/**
* Gets output stream.
*
* @return Object output stream.
* @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If thread is interrupted while trying to take holder from pool.
*/
- static OptimizedObjectOutputStream out() throws IgniteInterruptedCheckedException {
- if (outPool != null) {
- try {
- return outPool.take();
- }
- catch (InterruptedException e) {
- throw new IgniteInterruptedCheckedException(
- "Failed to take output object stream from pool (thread interrupted).", e);
- }
- }
- else
- return holder().acquireOut();
- }
+ abstract OptimizedObjectOutputStream out() throws IgniteInterruptedCheckedException;
/**
* Gets input stream.
@@ -93,83 +41,28 @@ class OptimizedObjectStreamRegistry {
* @return Object input stream.
* @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If thread is interrupted while trying to take holder from pool.
*/
- static OptimizedObjectInputStream in() throws IgniteInterruptedCheckedException {
- if (inPool != null) {
- try {
- return inPool.take();
- }
- catch (InterruptedException e) {
- throw new IgniteInterruptedCheckedException(
- "Failed to take input object stream from pool (thread interrupted).", e);
- }
- }
- else
- return holder().acquireIn();
- }
+ abstract OptimizedObjectInputStream in() throws IgniteInterruptedCheckedException;
/**
* Closes and releases output stream.
*
* @param out Object output stream.
*/
- static void closeOut(OptimizedObjectOutputStream out) {
- U.close(out, null);
-
- if (outPool != null) {
- boolean b = outPool.offer(out);
-
- assert b;
- }
- else {
- StreamHolder holder = holders.get();
-
- if (holder != null)
- holder.releaseOut();
- }
- }
+ abstract void closeOut(OptimizedObjectOutputStream out);
/**
* Closes and releases input stream.
*
* @param in Object input stream.
*/
- static void closeIn(OptimizedObjectInputStream in) {
- U.close(in, null);
-
- if (inPool != null) {
- boolean b = inPool.offer(in);
-
- assert b;
- }
- else {
- StreamHolder holder = holders.get();
-
- if (holder != null)
- holder.releaseIn();
- }
- }
-
- /**
- * Gets holder from pool or thread local.
- *
- * @return Stream holder.
- * @throws org.apache.ignite.internal.IgniteInterruptedCheckedException If thread is interrupted while trying to take holder from pool.
- */
- private static StreamHolder holder() throws IgniteInterruptedCheckedException {
- StreamHolder holder = holders.get();
-
- if (holder == null)
- holders.set(holder = new StreamHolder());
-
- return holder;
- }
+ abstract void closeIn(OptimizedObjectInputStream in);
/**
* Creates output stream.
*
* @return Object output stream.
*/
- private static OptimizedObjectOutputStream createOut() {
+ static OptimizedObjectOutputStream createOut() {
try {
return new OptimizedObjectOutputStream(new GridUnsafeDataOutput(4 * 1024));
}
@@ -183,7 +76,7 @@ class OptimizedObjectStreamRegistry {
*
* @return Object input stream.
*/
- private static OptimizedObjectInputStream createIn() {
+ static OptimizedObjectInputStream createIn() {
try {
return new OptimizedObjectInputStream(new GridUnsafeDataInput());
}
@@ -191,53 +84,4 @@ class OptimizedObjectStreamRegistry {
throw new IgniteException("Failed to create object input stream.", e);
}
}
-
- /**
- * Streams holder.
- */
- private static class StreamHolder {
- /** Output stream. */
- private final OptimizedObjectOutputStream out = createOut();
-
- /** Input stream. */
- private final OptimizedObjectInputStream in = createIn();
-
- /** Output streams counter. */
- private int outAcquireCnt;
-
- /** Input streams counter. */
- private int inAcquireCnt;
-
- /**
- * Gets output stream.
- *
- * @return Object output stream.
- */
- OptimizedObjectOutputStream acquireOut() {
- return outAcquireCnt++ > 0 ? createOut() : out;
- }
-
- /**
- * Gets input stream.
- *
- * @return Object input stream.
- */
- OptimizedObjectInputStream acquireIn() {
- return inAcquireCnt++ > 0 ? createIn() : in;
- }
-
- /**
- * Releases output stream.
- */
- void releaseOut() {
- outAcquireCnt--;
- }
-
- /**
- * Releases input stream.
- */
- void releaseIn() {
- inAcquireCnt--;
- }
- }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/marshaller/optimized/OptimizedObjectStreamSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/marshaller/optimized/OptimizedObjectStreamSelfTest.java
index 4b48a22..6a8dc50 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/marshaller/optimized/OptimizedObjectStreamSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/marshaller/optimized/OptimizedObjectStreamSelfTest.java
@@ -1026,7 +1026,9 @@ public class OptimizedObjectStreamSelfTest extends GridCommonAbstractTest {
*/
@Test
public void testReadToArray() throws Exception {
- OptimizedObjectInputStream in = OptimizedObjectStreamRegistry.in();
+ OptimizedObjectStreamRegistry reg = new OptimizedObjectSharedStreamRegistry();
+
+ OptimizedObjectInputStream in = reg.in();
try {
byte[] arr = new byte[50];
@@ -1065,7 +1067,7 @@ public class OptimizedObjectStreamSelfTest extends GridCommonAbstractTest {
assertEquals(i < 10 ? 40 + i : 0, buf[i]);
}
finally {
- OptimizedObjectStreamRegistry.closeIn(in);
+ reg.closeIn(in);
}
}
@@ -1178,8 +1180,10 @@ public class OptimizedObjectStreamSelfTest extends GridCommonAbstractTest {
OptimizedObjectOutputStream out = null;
OptimizedObjectInputStream in = null;
+ OptimizedObjectStreamRegistry reg = new OptimizedObjectSharedStreamRegistry();
+
try {
- out = OptimizedObjectStreamRegistry.out();
+ out = reg.out();
out.context(clsMap, CTX, null, true);
@@ -1187,7 +1191,7 @@ public class OptimizedObjectStreamSelfTest extends GridCommonAbstractTest {
byte[] arr = out.out().array();
- in = OptimizedObjectStreamRegistry.in();
+ in = reg.in();
in.context(clsMap, CTX, null, getClass().getClassLoader());
@@ -1200,8 +1204,8 @@ public class OptimizedObjectStreamSelfTest extends GridCommonAbstractTest {
return (T)obj0;
}
finally {
- OptimizedObjectStreamRegistry.closeOut(out);
- OptimizedObjectStreamRegistry.closeIn(in);
+ reg.closeOut(out);
+ reg.closeIn(in);
}
}
@@ -1210,10 +1214,8 @@ public class OptimizedObjectStreamSelfTest extends GridCommonAbstractTest {
*
* @param out Output stream.
* @param in Input stream.
- * @throws Exception If failed.
*/
- private void checkHandles(OptimizedObjectOutputStream out, OptimizedObjectInputStream in)
- throws Exception {
+ private void checkHandles(OptimizedObjectOutputStream out, OptimizedObjectInputStream in) {
Object[] outHandles = out.handledObjects();
Object[] inHandles = in.handledObjects();
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteReproducingSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteReproducingSuite.java
index 5bc41f4..3eb911a 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteReproducingSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteReproducingSuite.java
@@ -19,7 +19,7 @@ package org.apache.ignite.testsuites;
import java.util.ArrayList;
import java.util.List;
-import org.apache.ignite.cache.RemoveAllDeadlockTest;
+import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshallerPooledSelfTest;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -49,8 +49,8 @@ public class IgniteReproducingSuite {
suite.add(IgniteReproducingSuite.TestStub.class);
//uncomment to add some test
- for (int i = 0; i < 50; i++)
- suite.add(RemoveAllDeadlockTest.class);
+ for (int i = 0; i < 500; i++)
+ suite.add(OptimizedMarshallerPooledSelfTest.class);
return suite;
}