You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by pr...@apache.org on 2017/09/14 22:43:15 UTC
drill git commit: DRILL-5269 Make DirectSubScan Jackson JSON
deserializable
Repository: drill
Updated Branches:
refs/heads/master aaff1b35b -> 0b1cb98ad
DRILL-5269 Make DirectSubScan Jackson JSON deserializable
closes #926
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/0b1cb98a
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/0b1cb98a
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/0b1cb98a
Branch: refs/heads/master
Commit: 0b1cb98ad65b2057e5540f84986eb718850dad20
Parents: aaff1b3
Author: Vlad Rozov <vr...@apache.org>
Authored: Tue Aug 29 17:05:24 2017 -0700
Committer: Paul Rogers <pr...@maprtech.com>
Committed: Thu Sep 14 15:09:53 2017 -0700
----------------------------------------------------------------------
.../drill/exec/planner/PhysicalPlanReader.java | 26 ++++++--
.../drill/exec/store/direct/DirectSubScan.java | 16 ++++-
.../store/pojo/AbstractPojoRecordReader.java | 4 +-
.../store/pojo/DynamicPojoRecordReader.java | 40 ++++++++++++
.../apache/drill/exec/work/foreman/Foreman.java | 2 +-
.../exec/work/fragment/FragmentExecutor.java | 2 +-
.../java/org/apache/drill/TestBugFixes.java | 23 +++++++
.../apache/drill/exec/TestOpSerialization.java | 65 ++++++++++++++++++--
8 files changed, 165 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/0b1cb98a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
index f8284aa..e275d3c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
@@ -35,12 +35,15 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.record.MajorTypeSerDe;
import org.apache.drill.exec.server.options.OptionList;
import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.pojo.DynamicPojoRecordReader;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.deser.std.StdDelegatingDeserializer;
import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.annotations.VisibleForTesting;
public class PhysicalPlanReader {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalPlanReader.class);
@@ -53,19 +56,23 @@ public class PhysicalPlanReader {
public PhysicalPlanReader(DrillConfig config, ScanResult scanResult, LogicalPlanPersistence lpPersistance, final DrillbitEndpoint endpoint,
final StoragePluginRegistry pluginRegistry) {
+ ObjectMapper lpMapper = lpPersistance.getMapper();
+
// Endpoint serializer/deserializer.
SimpleModule deserModule = new SimpleModule("PhysicalOperatorModule") //
.addSerializer(DrillbitEndpoint.class, new DrillbitEndpointSerDe.Se()) //
.addDeserializer(DrillbitEndpoint.class, new DrillbitEndpointSerDe.De()) //
.addSerializer(MajorType.class, new MajorTypeSerDe.Se())
- .addDeserializer(MajorType.class, new MajorTypeSerDe.De());
+ .addDeserializer(MajorType.class, new MajorTypeSerDe.De())
+ .addDeserializer(DynamicPojoRecordReader.class,
+ new StdDelegatingDeserializer<>(new DynamicPojoRecordReader.Converter(lpMapper)));
- ObjectMapper lpMapper = lpPersistance.getMapper();
lpMapper.registerModule(deserModule);
Set<Class<? extends PhysicalOperator>> subTypes = PhysicalOperatorUtil.getSubTypes(scanResult);
for (Class<? extends PhysicalOperator> subType : subTypes) {
lpMapper.registerSubtypes(subType);
}
+ lpMapper.registerSubtypes(DynamicPojoRecordReader.class);
InjectableValues injectables = new InjectableValues.Std() //
.addValue(StoragePluginRegistry.class, pluginRegistry) //
.addValue(DrillbitEndpoint.class, endpoint); //
@@ -89,16 +96,27 @@ public class PhysicalPlanReader {
return physicalPlanReader.readValue(json);
}
- public FragmentRoot readFragmentOperator(String json) throws JsonProcessingException, IOException {
+ public FragmentRoot readFragmentRoot(String json) throws JsonProcessingException, IOException {
logger.debug("Attempting to read {}", json);
PhysicalOperator op = operatorReader.readValue(json);
- if(op instanceof FragmentLeaf){
+ if(op instanceof FragmentRoot){
return (FragmentRoot) op;
}else{
throw new UnsupportedOperationException(String.format("The provided json fragment doesn't have a FragmentRoot as its root operator. The operator was %s.", op.getClass().getCanonicalName()));
}
}
+ @VisibleForTesting
+ public FragmentLeaf readFragmentLeaf(String json) throws JsonProcessingException, IOException {
+ logger.debug("Attempting to read {}", json);
+ PhysicalOperator op = operatorReader.readValue(json);
+ if (op instanceof FragmentLeaf){
+ return (FragmentLeaf) op;
+ } else {
+ throw new UnsupportedOperationException(String.format("The provided json fragment is not a FragmentLeaf. The operator was %s.", op.getClass().getCanonicalName()));
+ }
+ }
+
public LogicalPlan readLogicalPlan(String json) throws JsonProcessingException, IOException{
logger.debug("Reading logical plan {}", json);
return logicalPlanReader.readValue(json);
http://git-wip-us.apache.org/repos/asf/drill/blob/0b1cb98a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectSubScan.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectSubScan.java
index 763ecba..ac3fe8d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectSubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectSubScan.java
@@ -21,16 +21,28 @@ import org.apache.drill.exec.physical.base.AbstractSubScan;
import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
import org.apache.drill.exec.store.RecordReader;
-public class DirectSubScan extends AbstractSubScan{
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+import static com.fasterxml.jackson.annotation.JsonTypeInfo.Id.NAME;
+import static com.fasterxml.jackson.annotation.JsonTypeInfo.As.WRAPPER_OBJECT;
+
+@JsonTypeName("direct-sub-scan")
+public class DirectSubScan extends AbstractSubScan {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DirectSubScan.class);
+ @JsonTypeInfo(use=NAME, include=WRAPPER_OBJECT)
private final RecordReader reader;
- public DirectSubScan(RecordReader reader) {
+ @JsonCreator
+ public DirectSubScan(@JsonProperty("reader") RecordReader reader) {
super(null);
this.reader = reader;
}
+ @JsonProperty
public RecordReader getReader() {
return reader;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/0b1cb98a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractPojoRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractPojoRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractPojoRecordReader.java
index 0c1144a..c2f213d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractPojoRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/AbstractPojoRecordReader.java
@@ -27,6 +27,8 @@ import org.apache.drill.exec.testing.ControlsInjectorFactory;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.ValueVector;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -40,7 +42,7 @@ public abstract class AbstractPojoRecordReader<T> extends AbstractRecordReader i
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractPojoRecordReader.class);
private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(AbstractPojoRecordReader.class);
- protected final List<T> records;
+ @JsonProperty protected final List<T> records;
protected List<PojoWriter> writers;
private Iterator<T> currentIterator;
http://git-wip-us.apache.org/repos/asf/drill/blob/0b1cb98a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/DynamicPojoRecordReader.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/DynamicPojoRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/DynamicPojoRecordReader.java
index 82383f0..167c721 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/DynamicPojoRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/DynamicPojoRecordReader.java
@@ -21,7 +21,16 @@ import com.google.common.base.Preconditions;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.physical.impl.OutputMutator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.util.StdConverter;
+
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -32,8 +41,10 @@ import java.util.Map;
*
* @param <T> type of given values, if contains various types, use Object class
*/
+@JsonTypeName("dynamic-pojo-record-reader")
public class DynamicPojoRecordReader<T> extends AbstractPojoRecordReader<List<T>> {
+ @JsonProperty
private final LinkedHashMap<String, Class<?>> schema;
public DynamicPojoRecordReader(LinkedHashMap<String, Class<?>> schema, List<List<T>> records) {
@@ -68,4 +79,33 @@ public class DynamicPojoRecordReader<T> extends AbstractPojoRecordReader<List<T>
"records = " + records +
"}";
}
+
+ /**
+ * An utility class that converts from {@link com.fasterxml.jackson.databind.JsonNode}
+ * to DynamicPojoRecordReader during physical plan fragment deserialization.
+ */
+ public static class Converter extends StdConverter<JsonNode, DynamicPojoRecordReader>
+ {
+ private static final TypeReference<LinkedHashMap<String, Class<?>>> schemaType =
+ new TypeReference<LinkedHashMap<String, Class<?>>>() {};
+
+ private final ObjectMapper mapper;
+
+ public Converter(ObjectMapper mapper)
+ {
+ this.mapper = mapper;
+ }
+
+ @Override
+ public DynamicPojoRecordReader convert(JsonNode value) {
+ LinkedHashMap<String, Class<?>> schema = mapper.convertValue(value.get("schema"), schemaType);
+
+ ArrayList records = new ArrayList(schema.size());
+ final Iterator<JsonNode> recordsIterator = value.get("records").get(0).elements();
+ for (Class<?> fieldType : schema.values()) {
+ records.add(mapper.convertValue(recordsIterator.next(), fieldType));
+ }
+ return new DynamicPojoRecordReader(schema, Collections.singletonList(records));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/0b1cb98a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 62c2307..8144da1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -493,7 +493,7 @@ public class Foreman implements Runnable {
final FragmentRoot rootOperator;
try {
- rootOperator = drillbitContext.getPlanReader().readFragmentOperator(rootFragment.getFragmentJson());
+ rootOperator = drillbitContext.getPlanReader().readFragmentRoot(rootFragment.getFragmentJson());
} catch (IOException e) {
throw new ExecutionSetupException(String.format("Unable to parse FragmentRoot from fragment: %s", rootFragment.getFragmentJson()));
}
http://git-wip-us.apache.org/repos/asf/drill/blob/0b1cb98a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index c57869a..daa94b7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -202,7 +202,7 @@ public class FragmentExecutor implements Runnable {
// if we didn't get the root operator when the executor was created, create it now.
final FragmentRoot rootOperator = this.rootOperator != null ? this.rootOperator :
- drillbitContext.getPlanReader().readFragmentOperator(fragment.getFragmentJson());
+ drillbitContext.getPlanReader().readFragmentRoot(fragment.getFragmentJson());
root = ImplCreator.getExec(fragmentContext, rootOperator);
if (root == null) {
http://git-wip-us.apache.org/repos/asf/drill/blob/0b1cb98a/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java b/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java
index 1a4d63b..9b25860 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java
@@ -273,4 +273,27 @@ public class TestBugFixes extends BaseTestQuery {
.baselineValues((long) 2)
.go();
}
+
+ @Test
+ public void testDRILL5269() throws Exception {
+ try {
+ test("ALTER SESSION SET `planner.enable_nljoin_for_scalar_only` = false");
+ test("ALTER SESSION SET `planner.slice_target` = 500");
+ test("\nSELECT `one` FROM (\n" +
+ " SELECT 1 `one` FROM cp.`tpch/nation.parquet`\n" +
+ " INNER JOIN (\n" +
+ " SELECT 2 `two` FROM cp.`tpch/nation.parquet`\n" +
+ " ) `t0` ON (\n" +
+ " `tpch/nation.parquet`.n_regionkey IS NOT DISTINCT FROM `t0`.`two`\n" +
+ " )\n" +
+ " GROUP BY `one`\n" +
+ ") `t1`\n" +
+ " INNER JOIN (\n" +
+ " SELECT count(1) `a_count` FROM cp.`tpch/nation.parquet`\n" +
+ ") `t5` ON TRUE\n");
+ } finally {
+ test("ALTER SESSION RESET `planner.enable_nljoin_for_scalar_only`");
+ test("ALTER SESSION RESET `planner.slice_target`");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/0b1cb98a/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java
index 7237183..67bd347 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java
@@ -19,7 +19,12 @@
package org.apache.drill.exec;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.List;
import org.apache.drill.common.config.DrillConfig;
@@ -35,18 +40,71 @@ import org.apache.drill.exec.physical.config.UnionExchange;
import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.exec.planner.PhysicalPlanReaderTestFactory;
import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.store.direct.DirectSubScan;
import org.apache.drill.exec.store.mock.MockSubScanPOP;
+import org.apache.drill.exec.store.pojo.DynamicPojoRecordReader;
+
+import org.junit.Before;
import org.junit.Test;
+import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.collect.Lists;
public class TestOpSerialization {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestOpSerialization.class);
+ private DrillConfig config;
+ private PhysicalPlanReader reader;
+ private ObjectWriter writer;
+
+ private static PhysicalOperator setupPhysicalOperator(PhysicalOperator operator)
+ {
+ operator.setOperatorId(1);
+ operator.setCost(1.0);
+ operator.setMaxAllocation(1000);
+ return operator;
+ }
+
+ private static void assertOperator(PhysicalOperator operator)
+ {
+ assertEquals(1, operator.getOperatorId());
+ assertEquals(1.0, operator.getCost(), 0.00001);
+ assertEquals(1000, operator.getMaxAllocation());
+ }
+
+ @Before
+ public void setUp()
+ {
+ config = DrillConfig.create();
+ reader = PhysicalPlanReaderTestFactory.defaultPhysicalPlanReader(config);
+ LogicalPlanPersistence logicalPlanPersistence = PhysicalPlanReaderTestFactory.defaultLogicalPlanPersistence(config);
+ writer = logicalPlanPersistence.getMapper().writer();
+ }
+
+ @Test
+ public void testDirectSubScan() throws Exception {
+ LinkedHashMap<String, Class<?>> schema = new LinkedHashMap<>();
+ schema.put("count1", Long.class);
+ schema.put("count2", Long.class);
+
+ DirectSubScan scan = new DirectSubScan(new DynamicPojoRecordReader<>(schema,
+ Collections.singletonList(Arrays.asList(0L, 1L))));
+ scan = (DirectSubScan) reader.readFragmentLeaf(writer.writeValueAsString(setupPhysicalOperator(scan)));
+ assertOperator(scan);
+ }
+
+ @Test
+ public void testMockSubScan() throws Exception {
+ MockSubScanPOP scan = new MockSubScanPOP("abc", false, null);
+ scan.setOperatorId(1);
+ scan = (MockSubScanPOP) reader.readFragmentLeaf(writer.writeValueAsString(setupPhysicalOperator(scan)));
+ assertOperator(scan);
+ assertEquals("abc", scan.getUrl());
+ assertNull(scan.getReadEntries());
+ assertFalse(scan.isExtended());
+ }
@Test
public void testSerializedDeserialize() throws Throwable {
- DrillConfig c = DrillConfig.create();
- PhysicalPlanReader reader = PhysicalPlanReaderTestFactory.defaultPhysicalPlanReader(c);
MockSubScanPOP s = new MockSubScanPOP("abc", false, null);
s.setOperatorId(3);
Filter f = new Filter(s, new ValueExpressions.BooleanExpression("true", ExpressionPosition.UNKNOWN), 0.1f);
@@ -69,8 +127,7 @@ public class TestOpSerialization {
pops = Lists.reverse(pops);
}
PhysicalPlan plan1 = new PhysicalPlan(PlanProperties.builder().build(), pops);
- LogicalPlanPersistence logicalPlanPersistence = PhysicalPlanReaderTestFactory.defaultLogicalPlanPersistence(c);
- String json = plan1.unparse(logicalPlanPersistence.getMapper().writer());
+ String json = plan1.unparse(writer);
PhysicalPlan plan2 = reader.readPhysicalPlan(json);