You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@calcite.apache.org by vl...@apache.org on 2019/01/08 17:05:50 UTC

[calcite] branch master updated: [CALCITE-2146] Errant CyclicMetadataException in multithreaded context (Paul Jackson)

This is an automated email from the ASF dual-hosted git repository.

vladimirsitnikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/calcite.git


The following commit(s) were added to refs/heads/master by this push:
     new 0fd4ca8  [CALCITE-2146] Errant CyclicMetadataException in multithreaded context (Paul Jackson)
0fd4ca8 is described below

commit 0fd4ca89d0cc8fa4ea8fd17c1657b28a3d8cf533
Author: Paul Jackson <pa...@stardog.com>
AuthorDate: Mon Jan 22 13:21:59 2018 -0500

    [CALCITE-2146] Errant CyclicMetadataException in multithreaded context (Paul Jackson)
    
    fixes #614
---
 .../org/apache/calcite/plan/RelOptCluster.java     |  2 +-
 .../rel/metadata/JaninoRelMetadataProvider.java    | 38 ++++++++--------
 .../org/apache/calcite/test/RelBuilderTest.java    | 53 ++++++++++++++++++++++
 3 files changed, 74 insertions(+), 19 deletions(-)

diff --git a/core/src/main/java/org/apache/calcite/plan/RelOptCluster.java b/core/src/main/java/org/apache/calcite/plan/RelOptCluster.java
index da6b8eb..a078354 100644
--- a/core/src/main/java/org/apache/calcite/plan/RelOptCluster.java
+++ b/core/src/main/java/org/apache/calcite/plan/RelOptCluster.java
@@ -147,7 +147,7 @@ public class RelOptCluster {
    * If you have a {@link RelOptRuleCall} available,
    * for example if you are in a {@link RelOptRule#onMatch(RelOptRuleCall)}
    * method, then use {@link RelOptRuleCall#getMetadataQuery()} instead. */
-  public RelMetadataQuery getMetadataQuery() {
+  public synchronized RelMetadataQuery getMetadataQuery() {
     if (mq == null) {
       mq = RelMetadataQuery.instance();
     }
diff --git a/core/src/main/java/org/apache/calcite/rel/metadata/JaninoRelMetadataProvider.java b/core/src/main/java/org/apache/calcite/rel/metadata/JaninoRelMetadataProvider.java
old mode 100644
new mode 100755
index e206ff3..888f10d
--- a/core/src/main/java/org/apache/calcite/rel/metadata/JaninoRelMetadataProvider.java
+++ b/core/src/main/java/org/apache/calcite/rel/metadata/JaninoRelMetadataProvider.java
@@ -269,44 +269,46 @@ public class JaninoRelMetadataProvider implements RelMetadataProvider {
       buff.append(", r");
       safeArgList(buff, method.e)
           .append(");\n")
-          .append("    final Object v = mq.map.get(key);\n")
-          .append("    if (v != null) {\n")
-          .append("      if (v == ")
+          .append("    synchronized (mq.map) {")
+          .append("      final Object v = mq.map.get(key);\n")
+          .append("      if (v != null) {\n")
+          .append("        if (v == ")
           .append(NullSentinel.class.getName())
           .append(".ACTIVE) {\n")
-          .append("        throw ")
+          .append("          throw ")
           .append(CyclicMetadataException.class.getName())
           .append(".INSTANCE;\n")
-          .append("      }\n")
-          .append("      if (v == ")
+          .append("        }\n")
+          .append("        if (v == ")
           .append(NullSentinel.class.getName())
           .append(".INSTANCE) {\n")
-          .append("        return null;\n")
-          .append("      }\n")
-          .append("      return (")
+          .append("          return null;\n")
+          .append("        }\n")
+          .append("        return (")
           .append(method.e.getReturnType().getName())
           .append(") v;\n")
-          .append("    }\n")
-          .append("    mq.map.put(key,")
+          .append("      }\n")
+          .append("      mq.map.put(key,")
           .append(NullSentinel.class.getName())
           .append(".ACTIVE);\n")
-          .append("    try {\n")
-          .append("      final ")
+          .append("      try {\n")
+          .append("        final ")
           .append(method.e.getReturnType().getName())
           .append(" x = ")
           .append(method.e.getName())
           .append("_(r, mq");
       argList(buff, method.e)
           .append(");\n")
-          .append("      mq.map.put(key, ")
+          .append("        mq.map.put(key, ")
           .append(NullSentinel.class.getName())
           .append(".mask(x));\n")
-          .append("      return x;\n")
-          .append("    } catch (")
+          .append("        return x;\n")
+          .append("      } catch (")
           .append(Exception.class.getName())
           .append(" e) {\n")
-          .append("      mq.map.remove(key);\n")
-          .append("      throw e;\n")
+          .append("        mq.map.remove(key);\n")
+          .append("        throw e;\n")
+          .append("      }\n")
           .append("    }\n")
           .append("  }\n")
           .append("\n")
diff --git a/core/src/test/java/org/apache/calcite/test/RelBuilderTest.java b/core/src/test/java/org/apache/calcite/test/RelBuilderTest.java
old mode 100644
new mode 100755
index d345241..ca02f4b
--- a/core/src/test/java/org/apache/calcite/test/RelBuilderTest.java
+++ b/core/src/test/java/org/apache/calcite/test/RelBuilderTest.java
@@ -69,11 +69,18 @@ import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
 import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import static org.apache.calcite.test.Matchers.hasTree;
 
@@ -1984,6 +1991,52 @@ public class RelBuilderTest {
     assertThat(root, hasTree(expected));
   }
 
+  @Test public void testConcurrentProject() {
+    // On my quad core hyperthreaded i7, 2 threads passed 10 times in 10 attempts,
+    // 4 threads passed 2/10, and 8 threads passed 0/0.
+    final int numThreads = 16;
+    ExecutorService service = Executors.newFixedThreadPool(numThreads);
+    final CyclicBarrier barrier = new CyclicBarrier(numThreads);
+
+    // When synchronized(mq.map) is not there, the error happens at iteration 0 or 1
+    for (int iteration = 0; iteration < 10; iteration++) {
+      barrier.reset();
+      final FrameworkConfig config = config().build();
+      final RelBuilder builder = RelBuilder.create(config);
+      final RelNode r = builder.scan("EMP").build();
+      List<Future<RelNode>> futures = new ArrayList<>(numThreads);
+      final Callable<RelNode> callable = () -> {
+        final RelBuilder builder1 = RelBuilder.create(config);
+        builder1.push(r);
+        final List<RexNode> fields = Lists.newArrayList((RexNode) builder1.field("ENAME"));
+        final List<String> fieldNames = Lists.newArrayList("F1");
+        barrier.await();
+        builder1.project(fields, fieldNames, true);
+        return builder1.build();
+      };
+      for (int i = 0; i < numThreads; i++) {
+        futures.add(service.submit(callable));
+      }
+      for (int i = 0; i < futures.size(); i++) {
+        Future<RelNode> f = futures.get(i);
+        final RelNode node;
+        try {
+          node = f.get();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new RuntimeException(e);
+        } catch (ExecutionException e) {
+          throw new RuntimeException(
+              "Unable to build relation concurrently: " + e.getMessage()
+                  + ", exception at iteration " + iteration, e.getCause());
+        }
+        String expected = "LogicalProject(F1=[$1])\n"
+            + "  LogicalTableScan(table=[[scott, EMP]])\n";
+        assertThat("Plan for thread #" + i, node, hasTree(expected));
+      }
+    }
+  }
+
   /** Tests that a sort on a field followed by a limit gives the same
    * effect as calling sortLimit.
    *