You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2022/06/29 20:05:30 UTC

[drill] branch master updated: DRILL-8253: Support the limit results in kafka scan (#2580)

This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 1189e25beb DRILL-8253: Support the limit results in kafka scan (#2580)
1189e25beb is described below

commit 1189e25beb311306ccdef0d13c210b6db19285d9
Author: luoc <lu...@apache.org>
AuthorDate: Thu Jun 30 04:05:24 2022 +0800

    DRILL-8253: Support the limit results in kafka scan (#2580)
    
    * Addressed review comments
    
    Co-authored-by: luoc <lu...@qq.com>
---
 .../drill/exec/store/kafka/KafkaGroupScan.java     | 63 +++++++++++++++++++---
 .../drill/exec/store/kafka/KafkaRecordReader.java  |  5 +-
 .../exec/store/kafka/KafkaScanBatchCreator.java    |  2 +-
 .../drill/exec/store/kafka/KafkaStoragePlugin.java |  2 +-
 .../drill/exec/store/kafka/KafkaSubScan.java       | 12 ++++-
 .../drill/exec/store/kafka/KafkaQueriesTest.java   | 10 ++++
 .../drill/exec/store/kafka/TestQueryConstants.java |  1 +
 7 files changed, 81 insertions(+), 14 deletions(-)

diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
index e025e65836..45c75890a3 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
@@ -30,6 +30,7 @@ import java.util.stream.Collectors;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
@@ -76,7 +77,8 @@ public class KafkaGroupScan extends AbstractGroupScan {
   private final KafkaStoragePlugin kafkaStoragePlugin;
   private final KafkaScanSpec kafkaScanSpec;
 
-  private List<SchemaPath> columns;
+  private final List<SchemaPath> columns;
+  private final int records;
   private ListMultimap<Integer, PartitionScanWork> assignments;
   private List<EndpointAffinity> affinities;
 
@@ -86,18 +88,21 @@ public class KafkaGroupScan extends AbstractGroupScan {
   public KafkaGroupScan(@JsonProperty("userName") String userName,
                         @JsonProperty("kafkaStoragePluginConfig") KafkaStoragePluginConfig kafkaStoragePluginConfig,
                         @JsonProperty("columns") List<SchemaPath> columns,
+                        @JsonProperty("records") int records,
                         @JsonProperty("kafkaScanSpec") KafkaScanSpec scanSpec,
                         @JacksonInject StoragePluginRegistry pluginRegistry) throws ExecutionSetupException {
     this(userName,
         pluginRegistry.resolve(kafkaStoragePluginConfig, KafkaStoragePlugin.class),
         columns,
+        records,
         scanSpec);
   }
 
-  public KafkaGroupScan(KafkaStoragePlugin kafkaStoragePlugin, KafkaScanSpec kafkaScanSpec, List<SchemaPath> columns) {
+  public KafkaGroupScan(KafkaStoragePlugin kafkaStoragePlugin, KafkaScanSpec kafkaScanSpec, List<SchemaPath> columns, int records) {
     super(StringUtils.EMPTY);
     this.kafkaStoragePlugin = kafkaStoragePlugin;
     this.columns = columns;
+    this.records = records;
     this.kafkaScanSpec = kafkaScanSpec;
     init();
   }
@@ -105,10 +110,12 @@ public class KafkaGroupScan extends AbstractGroupScan {
   public KafkaGroupScan(String userName,
                         KafkaStoragePlugin kafkaStoragePlugin,
                         List<SchemaPath> columns,
+                        int records,
                         KafkaScanSpec kafkaScanSpec) {
     super(userName);
     this.kafkaStoragePlugin = kafkaStoragePlugin;
     this.columns = columns;
+    this.records = records;
     this.kafkaScanSpec = kafkaScanSpec;
     init();
   }
@@ -117,6 +124,27 @@ public class KafkaGroupScan extends AbstractGroupScan {
     super(that);
     this.kafkaStoragePlugin = that.kafkaStoragePlugin;
     this.columns = that.columns;
+    this.records = that.records;
+    this.kafkaScanSpec = that.kafkaScanSpec;
+    this.assignments = that.assignments;
+    this.partitionWorkMap = that.partitionWorkMap;
+  }
+
+  public KafkaGroupScan(KafkaGroupScan that, List<SchemaPath> columns) {
+    super(that);
+    this.kafkaStoragePlugin = that.kafkaStoragePlugin;
+    this.columns = columns;
+    this.records = that.records;
+    this.kafkaScanSpec = that.kafkaScanSpec;
+    this.assignments = that.assignments;
+    this.partitionWorkMap = that.partitionWorkMap;
+  }
+
+  public KafkaGroupScan(KafkaGroupScan that, int records) {
+    super(that);
+    this.kafkaStoragePlugin = that.kafkaStoragePlugin;
+    this.columns = that.columns;
+    this.records = records;
     this.kafkaScanSpec = that.kafkaScanSpec;
     this.assignments = that.assignments;
     this.partitionWorkMap = that.partitionWorkMap;
@@ -262,6 +290,20 @@ public class KafkaGroupScan extends AbstractGroupScan {
     assignments = AssignmentCreator.getMappings(incomingEndpoints, Lists.newArrayList(partitionWorkMap.values()));
   }
 
+  @Override
+  public GroupScan applyLimit(int maxRecords) {
+    if (maxRecords > records) { // pass the limit value into sub-scan
+      return new KafkaGroupScan(this, maxRecords);
+    } else { // stop the transform
+      return null;
+    }
+  }
+
+  @Override
+  public boolean supportsLimitPushdown() {
+    return true;
+  }
+
   @Override
   public KafkaSubScan getSpecificScan(int minorFragmentId) {
     List<PartitionScanWork> workList = assignments.get(minorFragmentId);
@@ -270,7 +312,7 @@ public class KafkaGroupScan extends AbstractGroupScan {
       .map(PartitionScanWork::getPartitionScanSpec)
       .collect(Collectors.toList());
 
-    return new KafkaSubScan(getUserName(), kafkaStoragePlugin, columns, scanSpecList);
+    return new KafkaSubScan(getUserName(), kafkaStoragePlugin, columns, records, scanSpecList);
   }
 
   @Override
@@ -314,9 +356,7 @@ public class KafkaGroupScan extends AbstractGroupScan {
 
   @Override
   public GroupScan clone(List<SchemaPath> columns) {
-    KafkaGroupScan clone = new KafkaGroupScan(this);
-    clone.columns = columns;
-    return clone;
+    return new KafkaGroupScan(this, columns);
   }
 
   public GroupScan cloneWithNewSpec(List<KafkaPartitionScanSpec> partitionScanSpecList) {
@@ -347,6 +387,11 @@ public class KafkaGroupScan extends AbstractGroupScan {
     return columns;
   }
 
+  @JsonProperty
+  public int getRecords() {
+    return records;
+  }
+
   @JsonProperty
   public KafkaScanSpec getKafkaScanSpec() {
     return kafkaScanSpec;
@@ -359,7 +404,11 @@ public class KafkaGroupScan extends AbstractGroupScan {
 
   @Override
   public String toString() {
-    return String.format("KafkaGroupScan [KafkaScanSpec=%s, columns=%s]", kafkaScanSpec, columns);
+    return new PlanStringBuilder("")
+        .field("scanSpec", kafkaScanSpec)
+        .field("columns", columns)
+        .field("records", records)
+        .toString();
   }
 
   @JsonIgnore
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
index 551b62fed6..0d38ea19af 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
@@ -62,6 +62,7 @@ public class KafkaRecordReader implements ManagedReader<SchemaNegotiator> {
       }
     };
     negotiator.setErrorContext(errorContext);
+    negotiator.limit(maxRecords);
 
     messageReader = MessageReaderFactory.getMessageReader(readOptions.getMessageReader());
     messageReader.init(negotiator, readOptions, plugin);
@@ -86,10 +87,6 @@ public class KafkaRecordReader implements ManagedReader<SchemaNegotiator> {
   }
 
   private boolean nextLine(RowSetLoader rowWriter) {
-    if (rowWriter.limitReached(maxRecords)) {
-      return false;
-    }
-
     if (currentOffset >= subScanSpec.getEndOffset() || !msgItr.hasNext()) {
       return false;
     }
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
index 7d2ebcc413..4adfdca26a 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
@@ -61,7 +61,7 @@ public class KafkaScanBatchCreator implements BatchCreator<KafkaSubScan> {
     builder.setUserName(subScan.getUserName());
 
     List<ManagedReader<SchemaNegotiator>> readers = subScan.getPartitionSubScanSpecList().stream()
-        .map(scanSpec -> new KafkaRecordReader(scanSpec, options, subScan.getKafkaStoragePlugin(), -1))
+        .map(scanSpec -> new KafkaRecordReader(scanSpec, options, subScan.getKafkaStoragePlugin(), subScan.getRecords()))
         .collect(Collectors.toList());
     ManagedScanFramework.ReaderFactory readerFactory = new BasicScanFactory(readers.iterator());
     builder.setReaderFactory(readerFactory);
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
index 257c0bf5a1..d80b959a04 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
@@ -76,7 +76,7 @@ public class KafkaStoragePlugin extends AbstractStoragePlugin {
     KafkaScanSpec kafkaScanSpec = selection.getListWith(new ObjectMapper(),
         new TypeReference<KafkaScanSpec>() {
         });
-    return new KafkaGroupScan(this, kafkaScanSpec, null);
+    return new KafkaGroupScan(this, kafkaScanSpec, null, -1);
   }
 
   public void registerToClose(AutoCloseable autoCloseable) {
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
index c55f1e7234..c05ddd01d7 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
@@ -44,6 +44,7 @@ public class KafkaSubScan extends AbstractBase implements SubScan {
 
   private final KafkaStoragePlugin kafkaStoragePlugin;
   private final List<SchemaPath> columns;
+  private final int records;
   private final List<KafkaPartitionScanSpec> partitionSubScanSpecList;
 
   @JsonCreator
@@ -51,21 +52,25 @@ public class KafkaSubScan extends AbstractBase implements SubScan {
                       @JsonProperty("userName") String userName,
                       @JsonProperty("kafkaStoragePluginConfig") KafkaStoragePluginConfig kafkaStoragePluginConfig,
                       @JsonProperty("columns") List<SchemaPath> columns,
+                      @JsonProperty("records") int records,
                       @JsonProperty("partitionSubScanSpecList") LinkedList<KafkaPartitionScanSpec> partitionSubScanSpecList)
       throws ExecutionSetupException {
     this(userName,
         registry.resolve(kafkaStoragePluginConfig, KafkaStoragePlugin.class),
         columns,
+        records,
         partitionSubScanSpecList);
   }
 
   public KafkaSubScan(String userName,
                       KafkaStoragePlugin kafkaStoragePlugin,
                       List<SchemaPath> columns,
+                      int records,
                       List<KafkaPartitionScanSpec> partitionSubScanSpecList) {
     super(userName);
     this.kafkaStoragePlugin = kafkaStoragePlugin;
     this.columns = columns;
+    this.records = records;
     this.partitionSubScanSpecList = partitionSubScanSpecList;
   }
 
@@ -77,7 +82,7 @@ public class KafkaSubScan extends AbstractBase implements SubScan {
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     Preconditions.checkArgument(children.isEmpty());
-    return new KafkaSubScan(getUserName(), kafkaStoragePlugin, columns, partitionSubScanSpecList);
+    return new KafkaSubScan(getUserName(), kafkaStoragePlugin, columns, records, partitionSubScanSpecList);
   }
 
   @Override
@@ -95,6 +100,11 @@ public class KafkaSubScan extends AbstractBase implements SubScan {
     return columns;
   }
 
+  @JsonProperty
+  public int getRecords() {
+    return records;
+  }
+
   @JsonProperty
   public List<KafkaPartitionScanSpec> getPartitionSubScanSpecList() {
     return partitionSubScanSpecList;
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
index e86423e4f0..e91303610f 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
@@ -63,6 +63,16 @@ public class KafkaQueriesTest extends KafkaTestBase {
     }
   }
 
+  @Test
+  public void testResultLimit() throws Exception {
+    String queryString = String.format(TestQueryConstants.MSG_LIMIT_QUERY, TestQueryConstants.JSON_TOPIC);
+    queryBuilder()
+      .sql(queryString)
+      .planMatcher()
+      .include("Scan", "records=3")
+      .match();
+  }
+
   @Test
   public void testResultCount() {
     String queryString = String.format(TestQueryConstants.MSG_SELECT_QUERY, TestQueryConstants.JSON_TOPIC);
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestQueryConstants.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestQueryConstants.java
index b3163adbcd..b370b9bd63 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestQueryConstants.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestQueryConstants.java
@@ -40,6 +40,7 @@ public interface TestQueryConstants {
   // Queries
   String MSG_COUNT_QUERY = "select count(*) from kafka.`%s`";
   String MSG_SELECT_QUERY = "select * from kafka.`%s`";
+  String MSG_LIMIT_QUERY = "select * from kafka.`%s` limit 3";
   String MIN_OFFSET_QUERY = "select MIN(kafkaMsgOffset) as minOffset from kafka.`%s`";
   String MAX_OFFSET_QUERY = "select MAX(kafkaMsgOffset) as maxOffset from kafka.`%s`";