You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by vl...@apache.org on 2019/01/08 17:05:50 UTC
[calcite] branch master updated: [CALCITE-2146] Errant
CyclicMetadataException in multithreaded context (Paul Jackson)
This is an automated email from the ASF dual-hosted git repository.
vladimirsitnikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git
The following commit(s) were added to refs/heads/master by this push:
new 0fd4ca8 [CALCITE-2146] Errant CyclicMetadataException in multithreaded context (Paul Jackson)
0fd4ca8 is described below
commit 0fd4ca89d0cc8fa4ea8fd17c1657b28a3d8cf533
Author: Paul Jackson <pa...@stardog.com>
AuthorDate: Mon Jan 22 13:21:59 2018 -0500
[CALCITE-2146] Errant CyclicMetadataException in multithreaded context (Paul Jackson)
fixes #614
---
.../org/apache/calcite/plan/RelOptCluster.java | 2 +-
.../rel/metadata/JaninoRelMetadataProvider.java | 38 ++++++++--------
.../org/apache/calcite/test/RelBuilderTest.java | 53 ++++++++++++++++++++++
3 files changed, 74 insertions(+), 19 deletions(-)
diff --git a/core/src/main/java/org/apache/calcite/plan/RelOptCluster.java b/core/src/main/java/org/apache/calcite/plan/RelOptCluster.java
index da6b8eb..a078354 100644
--- a/core/src/main/java/org/apache/calcite/plan/RelOptCluster.java
+++ b/core/src/main/java/org/apache/calcite/plan/RelOptCluster.java
@@ -147,7 +147,7 @@ public class RelOptCluster {
* If you have a {@link RelOptRuleCall} available,
* for example if you are in a {@link RelOptRule#onMatch(RelOptRuleCall)}
* method, then use {@link RelOptRuleCall#getMetadataQuery()} instead. */
- public RelMetadataQuery getMetadataQuery() {
+ public synchronized RelMetadataQuery getMetadataQuery() {
if (mq == null) {
mq = RelMetadataQuery.instance();
}
diff --git a/core/src/main/java/org/apache/calcite/rel/metadata/JaninoRelMetadataProvider.java b/core/src/main/java/org/apache/calcite/rel/metadata/JaninoRelMetadataProvider.java
old mode 100644
new mode 100755
index e206ff3..888f10d
--- a/core/src/main/java/org/apache/calcite/rel/metadata/JaninoRelMetadataProvider.java
+++ b/core/src/main/java/org/apache/calcite/rel/metadata/JaninoRelMetadataProvider.java
@@ -269,44 +269,46 @@ public class JaninoRelMetadataProvider implements RelMetadataProvider {
buff.append(", r");
safeArgList(buff, method.e)
.append(");\n")
- .append(" final Object v = mq.map.get(key);\n")
- .append(" if (v != null) {\n")
- .append(" if (v == ")
+ .append(" synchronized (mq.map) {")
+ .append(" final Object v = mq.map.get(key);\n")
+ .append(" if (v != null) {\n")
+ .append(" if (v == ")
.append(NullSentinel.class.getName())
.append(".ACTIVE) {\n")
- .append(" throw ")
+ .append(" throw ")
.append(CyclicMetadataException.class.getName())
.append(".INSTANCE;\n")
- .append(" }\n")
- .append(" if (v == ")
+ .append(" }\n")
+ .append(" if (v == ")
.append(NullSentinel.class.getName())
.append(".INSTANCE) {\n")
- .append(" return null;\n")
- .append(" }\n")
- .append(" return (")
+ .append(" return null;\n")
+ .append(" }\n")
+ .append(" return (")
.append(method.e.getReturnType().getName())
.append(") v;\n")
- .append(" }\n")
- .append(" mq.map.put(key,")
+ .append(" }\n")
+ .append(" mq.map.put(key,")
.append(NullSentinel.class.getName())
.append(".ACTIVE);\n")
- .append(" try {\n")
- .append(" final ")
+ .append(" try {\n")
+ .append(" final ")
.append(method.e.getReturnType().getName())
.append(" x = ")
.append(method.e.getName())
.append("_(r, mq");
argList(buff, method.e)
.append(");\n")
- .append(" mq.map.put(key, ")
+ .append(" mq.map.put(key, ")
.append(NullSentinel.class.getName())
.append(".mask(x));\n")
- .append(" return x;\n")
- .append(" } catch (")
+ .append(" return x;\n")
+ .append(" } catch (")
.append(Exception.class.getName())
.append(" e) {\n")
- .append(" mq.map.remove(key);\n")
- .append(" throw e;\n")
+ .append(" mq.map.remove(key);\n")
+ .append(" throw e;\n")
+ .append(" }\n")
.append(" }\n")
.append(" }\n")
.append("\n")
diff --git a/core/src/test/java/org/apache/calcite/test/RelBuilderTest.java b/core/src/test/java/org/apache/calcite/test/RelBuilderTest.java
old mode 100644
new mode 100755
index d345241..ca02f4b
--- a/core/src/test/java/org/apache/calcite/test/RelBuilderTest.java
+++ b/core/src/test/java/org/apache/calcite/test/RelBuilderTest.java
@@ -69,11 +69,18 @@ import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import static org.apache.calcite.test.Matchers.hasTree;
@@ -1984,6 +1991,52 @@ public class RelBuilderTest {
assertThat(root, hasTree(expected));
}
+ @Test public void testConcurrentProject() {
+ // On my quad core hyperthreaded i7, 2 threads passed 10 times in 10 attempts,
+ // 4 threads passed 2/10, and 8 threads passed 0/0.
+ final int numThreads = 16;
+ ExecutorService service = Executors.newFixedThreadPool(numThreads);
+ final CyclicBarrier barrier = new CyclicBarrier(numThreads);
+
+ // When synchronized(mq.map) is not there, the error happens at iteration 0 or 1
+ for (int iteration = 0; iteration < 10; iteration++) {
+ barrier.reset();
+ final FrameworkConfig config = config().build();
+ final RelBuilder builder = RelBuilder.create(config);
+ final RelNode r = builder.scan("EMP").build();
+ List<Future<RelNode>> futures = new ArrayList<>(numThreads);
+ final Callable<RelNode> callable = () -> {
+ final RelBuilder builder1 = RelBuilder.create(config);
+ builder1.push(r);
+ final List<RexNode> fields = Lists.newArrayList((RexNode) builder1.field("ENAME"));
+ final List<String> fieldNames = Lists.newArrayList("F1");
+ barrier.await();
+ builder1.project(fields, fieldNames, true);
+ return builder1.build();
+ };
+ for (int i = 0; i < numThreads; i++) {
+ futures.add(service.submit(callable));
+ }
+ for (int i = 0; i < futures.size(); i++) {
+ Future<RelNode> f = futures.get(i);
+ final RelNode node;
+ try {
+ node = f.get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(
+ "Unable to build relation concurrently: " + e.getMessage()
+ + ", exception at iteration " + iteration, e.getCause());
+ }
+ String expected = "LogicalProject(F1=[$1])\n"
+ + " LogicalTableScan(table=[[scott, EMP]])\n";
+ assertThat("Plan for thread #" + i, node, hasTree(expected));
+ }
+ }
+ }
+
/** Tests that a sort on a field followed by a limit gives the same
* effect as calling sortLimit.
*