You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2017/09/14 22:43:15 UTC

drill git commit: DRILL-5269 Make DirectSubScan Jackson JSON deserializable

Repository: drill
Updated Branches:
  refs/heads/master aaff1b35b -> 0b1cb98ad


DRILL-5269 Make DirectSubScan Jackson JSON deserializable

closes #926


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/0b1cb98a
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/0b1cb98a
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/0b1cb98a

Branch: refs/heads/master
Commit: 0b1cb98ad65b2057e5540f84986eb718850dad20
Parents: aaff1b3
Author: Vlad Rozov <vr...@apache.org>
Authored: Tue Aug 29 17:05:24 2017 -0700
Committer: Paul Rogers <pr...@maprtech.com>
Committed: Thu Sep 14 15:09:53 2017 -0700

----------------------------------------------------------------------
 .../drill/exec/planner/PhysicalPlanReader.java  | 26 ++++++--
 .../drill/exec/store/direct/DirectSubScan.java  | 16 ++++-
 .../store/pojo/AbstractPojoRecordReader.java    |  4 +-
 .../store/pojo/DynamicPojoRecordReader.java     | 40 ++++++++++++
 .../apache/drill/exec/work/foreman/Foreman.java |  2 +-
 .../exec/work/fragment/FragmentExecutor.java    |  2 +-
 .../java/org/apache/drill/TestBugFixes.java     | 23 +++++++
 .../apache/drill/exec/TestOpSerialization.java  | 65 ++++++++++++++++++--
 8 files changed, 165 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/0b1cb98a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
index f8284aa..e275d3c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
@@ -35,12 +35,15 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.record.MajorTypeSerDe;
 import org.apache.drill.exec.server.options.OptionList;
 import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.pojo.DynamicPojoRecordReader;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.InjectableValues;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.deser.std.StdDelegatingDeserializer;
 import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.annotations.VisibleForTesting;
 
 public class PhysicalPlanReader {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalPlanReader.class);
@@ -53,19 +56,23 @@ public class PhysicalPlanReader {
   public PhysicalPlanReader(DrillConfig config, ScanResult scanResult, LogicalPlanPersistence lpPersistance, final DrillbitEndpoint endpoint,
                             final StoragePluginRegistry pluginRegistry) {
 
+    ObjectMapper lpMapper = lpPersistance.getMapper();
+
     // Endpoint serializer/deserializer.
     SimpleModule deserModule = new SimpleModule("PhysicalOperatorModule") //
         .addSerializer(DrillbitEndpoint.class, new DrillbitEndpointSerDe.Se()) //
         .addDeserializer(DrillbitEndpoint.class, new DrillbitEndpointSerDe.De()) //
         .addSerializer(MajorType.class, new MajorTypeSerDe.Se())
-        .addDeserializer(MajorType.class, new MajorTypeSerDe.De());
+        .addDeserializer(MajorType.class, new MajorTypeSerDe.De())
+        .addDeserializer(DynamicPojoRecordReader.class,
+            new StdDelegatingDeserializer<>(new DynamicPojoRecordReader.Converter(lpMapper)));
 
-    ObjectMapper lpMapper = lpPersistance.getMapper();
     lpMapper.registerModule(deserModule);
     Set<Class<? extends PhysicalOperator>> subTypes = PhysicalOperatorUtil.getSubTypes(scanResult);
     for (Class<? extends PhysicalOperator> subType : subTypes) {
       lpMapper.registerSubtypes(subType);
     }
+    lpMapper.registerSubtypes(DynamicPojoRecordReader.class);
     InjectableValues injectables = new InjectableValues.Std() //
             .addValue(StoragePluginRegistry.class, pluginRegistry) //
         .addValue(DrillbitEndpoint.class, endpoint); //
@@ -89,16 +96,27 @@ public class PhysicalPlanReader {
     return physicalPlanReader.readValue(json);
   }
 
-  public FragmentRoot readFragmentOperator(String json) throws JsonProcessingException, IOException {
+  public FragmentRoot readFragmentRoot(String json) throws JsonProcessingException, IOException {
     logger.debug("Attempting to read {}", json);
     PhysicalOperator op = operatorReader.readValue(json);
-    if(op instanceof FragmentLeaf){
+    if(op instanceof FragmentRoot){
       return (FragmentRoot) op;
     }else{
       throw new UnsupportedOperationException(String.format("The provided json fragment doesn't have a FragmentRoot as its root operator.  The operator was %s.", op.getClass().getCanonicalName()));
     }
   }
 
+  @VisibleForTesting
+  public FragmentLeaf readFragmentLeaf(String json) throws JsonProcessingException, IOException {
+    logger.debug("Attempting to read {}", json);
+    PhysicalOperator op = operatorReader.readValue(json);
+    if (op instanceof FragmentLeaf){
+      return (FragmentLeaf) op;
+    } else {
+      throw new UnsupportedOperationException(String.format("The provided json fragment is not a FragmentLeaf. The operator was %s.", op.getClass().getCanonicalName()));
+    }
+  }
+
   public LogicalPlan readLogicalPlan(String json) throws JsonProcessingException, IOException{
     logger.debug("Reading logical plan {}", json);
     return logicalPlanReader.readValue(json);

http://git-wip-us.apache.org/repos/asf/drill/blob/0b1cb98a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectSubScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectSubScan.java
index 763ecba..ac3fe8d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectSubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectSubScan.java
@@ -21,16 +21,28 @@ import org.apache.drill.exec.physical.base.AbstractSubScan;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 import org.apache.drill.exec.store.RecordReader;
 
-public class DirectSubScan extends AbstractSubScan{
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+import static com.fasterxml.jackson.annotation.JsonTypeInfo.Id.NAME;
+import static com.fasterxml.jackson.annotation.JsonTypeInfo.As.WRAPPER_OBJECT;
+
+@JsonTypeName("direct-sub-scan")
+public class DirectSubScan extends AbstractSubScan {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DirectSubScan.class);
 
+  @JsonTypeInfo(use=NAME, include=WRAPPER_OBJECT)
   private final RecordReader reader;
 
-  public DirectSubScan(RecordReader reader) {
+  @JsonCreator
+  public DirectSubScan(@JsonProperty("reader") RecordReader reader) {
     super(null);
     this.reader = reader;
   }
 
+  @JsonProperty
   public RecordReader getReader() {
     return reader;
   }

http://git-wip-us.apache.org/repos/asf/drill/blob/0b1cb98a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractPojoRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractPojoRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractPojoRecordReader.java
index 0c1144a..c2f213d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractPojoRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractPojoRecordReader.java
@@ -27,6 +27,8 @@ import org.apache.drill.exec.testing.ControlsInjectorFactory;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.ValueVector;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
+
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -40,7 +42,7 @@ public abstract class AbstractPojoRecordReader<T> extends AbstractRecordReader i
   private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractPojoRecordReader.class);
   private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(AbstractPojoRecordReader.class);
 
-  protected final List<T> records;
+  @JsonProperty protected final List<T> records;
   protected List<PojoWriter> writers;
 
   private Iterator<T> currentIterator;

http://git-wip-us.apache.org/repos/asf/drill/blob/0b1cb98a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/DynamicPojoRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/DynamicPojoRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/DynamicPojoRecordReader.java
index 82383f0..167c721 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/DynamicPojoRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/DynamicPojoRecordReader.java
@@ -21,7 +21,16 @@ import com.google.common.base.Preconditions;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.util.StdConverter;
+
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -32,8 +41,10 @@ import java.util.Map;
  *
  * @param <T> type of given values, if contains various types, use Object class
  */
+@JsonTypeName("dynamic-pojo-record-reader")
 public class DynamicPojoRecordReader<T> extends AbstractPojoRecordReader<List<T>> {
 
+  @JsonProperty
   private final LinkedHashMap<String, Class<?>> schema;
 
   public DynamicPojoRecordReader(LinkedHashMap<String, Class<?>> schema, List<List<T>> records) {
@@ -68,4 +79,33 @@ public class DynamicPojoRecordReader<T> extends AbstractPojoRecordReader<List<T>
         "records = " + records +
         "}";
   }
+
+  /**
+   * An utility class that converts from {@link com.fasterxml.jackson.databind.JsonNode}
+   * to DynamicPojoRecordReader during physical plan fragment deserialization.
+   */
+  public static class Converter extends StdConverter<JsonNode, DynamicPojoRecordReader>
+  {
+    private static final TypeReference<LinkedHashMap<String, Class<?>>> schemaType =
+        new TypeReference<LinkedHashMap<String, Class<?>>>() {};
+
+    private final ObjectMapper mapper;
+
+    public Converter(ObjectMapper mapper)
+    {
+      this.mapper = mapper;
+    }
+
+    @Override
+    public DynamicPojoRecordReader convert(JsonNode value) {
+      LinkedHashMap<String, Class<?>> schema = mapper.convertValue(value.get("schema"), schemaType);
+
+      ArrayList records = new ArrayList(schema.size());
+      final Iterator<JsonNode> recordsIterator = value.get("records").get(0).elements();
+      for (Class<?> fieldType : schema.values()) {
+        records.add(mapper.convertValue(recordsIterator.next(), fieldType));
+      }
+      return new DynamicPojoRecordReader(schema, Collections.singletonList(records));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/0b1cb98a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 62c2307..8144da1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -493,7 +493,7 @@ public class Foreman implements Runnable {
 
     final FragmentRoot rootOperator;
     try {
-      rootOperator = drillbitContext.getPlanReader().readFragmentOperator(rootFragment.getFragmentJson());
+      rootOperator = drillbitContext.getPlanReader().readFragmentRoot(rootFragment.getFragmentJson());
     } catch (IOException e) {
       throw new ExecutionSetupException(String.format("Unable to parse FragmentRoot from fragment: %s", rootFragment.getFragmentJson()));
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/0b1cb98a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index c57869a..daa94b7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -202,7 +202,7 @@ public class FragmentExecutor implements Runnable {
 
       // if we didn't get the root operator when the executor was created, create it now.
       final FragmentRoot rootOperator = this.rootOperator != null ? this.rootOperator :
-          drillbitContext.getPlanReader().readFragmentOperator(fragment.getFragmentJson());
+          drillbitContext.getPlanReader().readFragmentRoot(fragment.getFragmentJson());
 
           root = ImplCreator.getExec(fragmentContext, rootOperator);
           if (root == null) {

http://git-wip-us.apache.org/repos/asf/drill/blob/0b1cb98a/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java b/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java
index 1a4d63b..9b25860 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java
@@ -273,4 +273,27 @@ public class TestBugFixes extends BaseTestQuery {
         .baselineValues((long) 2)
         .go();
   }
+
+  @Test
+  public void testDRILL5269() throws Exception {
+    try {
+      test("ALTER SESSION SET `planner.enable_nljoin_for_scalar_only` = false");
+      test("ALTER SESSION SET `planner.slice_target` = 500");
+      test("\nSELECT `one` FROM (\n" +
+          "  SELECT 1 `one` FROM cp.`tpch/nation.parquet`\n" +
+          "  INNER JOIN (\n" +
+          "    SELECT 2 `two` FROM cp.`tpch/nation.parquet`\n" +
+          "  ) `t0` ON (\n" +
+          "    `tpch/nation.parquet`.n_regionkey IS NOT DISTINCT FROM `t0`.`two`\n" +
+          "  )\n" +
+          "  GROUP BY `one`\n" +
+          ") `t1`\n" +
+          "  INNER JOIN (\n" +
+          "    SELECT count(1) `a_count` FROM cp.`tpch/nation.parquet`\n" +
+          ") `t5` ON TRUE\n");
+    } finally {
+      test("ALTER SESSION RESET `planner.enable_nljoin_for_scalar_only`");
+      test("ALTER SESSION RESET `planner.slice_target`");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/0b1cb98a/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java
index 7237183..67bd347 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java
@@ -19,7 +19,12 @@
 package org.apache.drill.exec;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
 import java.util.List;
 
 import org.apache.drill.common.config.DrillConfig;
@@ -35,18 +40,71 @@ import org.apache.drill.exec.physical.config.UnionExchange;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
 import org.apache.drill.exec.planner.PhysicalPlanReaderTestFactory;
 import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.store.direct.DirectSubScan;
 import org.apache.drill.exec.store.mock.MockSubScanPOP;
+import org.apache.drill.exec.store.pojo.DynamicPojoRecordReader;
+
+import org.junit.Before;
 import org.junit.Test;
 
+import com.fasterxml.jackson.databind.ObjectWriter;
 import com.google.common.collect.Lists;
 
 public class TestOpSerialization {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestOpSerialization.class);
+  private DrillConfig config;
+  private PhysicalPlanReader reader;
+  private ObjectWriter writer;
+
+  private static PhysicalOperator setupPhysicalOperator(PhysicalOperator operator)
+  {
+    operator.setOperatorId(1);
+    operator.setCost(1.0);
+    operator.setMaxAllocation(1000);
+    return operator;
+  }
+
+  private static void assertOperator(PhysicalOperator operator)
+  {
+    assertEquals(1, operator.getOperatorId());
+    assertEquals(1.0, operator.getCost(), 0.00001);
+    assertEquals(1000, operator.getMaxAllocation());
+  }
+
+  @Before
+  public void setUp()
+  {
+    config = DrillConfig.create();
+    reader = PhysicalPlanReaderTestFactory.defaultPhysicalPlanReader(config);
+    LogicalPlanPersistence logicalPlanPersistence = PhysicalPlanReaderTestFactory.defaultLogicalPlanPersistence(config);
+    writer = logicalPlanPersistence.getMapper().writer();
+  }
+
+  @Test
+  public void testDirectSubScan() throws Exception {
+    LinkedHashMap<String, Class<?>> schema = new LinkedHashMap<>();
+    schema.put("count1", Long.class);
+    schema.put("count2", Long.class);
+
+    DirectSubScan scan = new DirectSubScan(new DynamicPojoRecordReader<>(schema,
+        Collections.singletonList(Arrays.asList(0L, 1L))));
+    scan = (DirectSubScan) reader.readFragmentLeaf(writer.writeValueAsString(setupPhysicalOperator(scan)));
+    assertOperator(scan);
+  }
+
+  @Test
+  public void testMockSubScan() throws Exception {
+    MockSubScanPOP scan = new MockSubScanPOP("abc", false, null);
+    scan.setOperatorId(1);
+    scan = (MockSubScanPOP) reader.readFragmentLeaf(writer.writeValueAsString(setupPhysicalOperator(scan)));
+    assertOperator(scan);
+    assertEquals("abc", scan.getUrl());
+    assertNull(scan.getReadEntries());
+    assertFalse(scan.isExtended());
+  }
 
   @Test
   public void testSerializedDeserialize() throws Throwable {
-    DrillConfig c = DrillConfig.create();
-    PhysicalPlanReader reader = PhysicalPlanReaderTestFactory.defaultPhysicalPlanReader(c);
     MockSubScanPOP s = new MockSubScanPOP("abc", false, null);
     s.setOperatorId(3);
     Filter f = new Filter(s, new ValueExpressions.BooleanExpression("true", ExpressionPosition.UNKNOWN), 0.1f);
@@ -69,8 +127,7 @@ public class TestOpSerialization {
         pops = Lists.reverse(pops);
       }
       PhysicalPlan plan1 = new PhysicalPlan(PlanProperties.builder().build(), pops);
-      LogicalPlanPersistence logicalPlanPersistence = PhysicalPlanReaderTestFactory.defaultLogicalPlanPersistence(c);
-      String json = plan1.unparse(logicalPlanPersistence.getMapper().writer());
+      String json = plan1.unparse(writer);
 
       PhysicalPlan plan2 = reader.readPhysicalPlan(json);