You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by cg...@apache.org on 2022/06/29 20:05:30 UTC
[drill] branch master updated: DRILL-8253: Support the limit results in kafka scan (#2580)
This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 1189e25beb DRILL-8253: Support the limit results in kafka scan (#2580)
1189e25beb is described below
commit 1189e25beb311306ccdef0d13c210b6db19285d9
Author: luoc <lu...@apache.org>
AuthorDate: Thu Jun 30 04:05:24 2022 +0800
DRILL-8253: Support the limit results in kafka scan (#2580)
* Addressed review comments
Co-authored-by: luoc <lu...@qq.com>
---
.../drill/exec/store/kafka/KafkaGroupScan.java | 63 +++++++++++++++++++---
.../drill/exec/store/kafka/KafkaRecordReader.java | 5 +-
.../exec/store/kafka/KafkaScanBatchCreator.java | 2 +-
.../drill/exec/store/kafka/KafkaStoragePlugin.java | 2 +-
.../drill/exec/store/kafka/KafkaSubScan.java | 12 ++++-
.../drill/exec/store/kafka/KafkaQueriesTest.java | 10 ++++
.../drill/exec/store/kafka/TestQueryConstants.java | 1 +
7 files changed, 81 insertions(+), 14 deletions(-)
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
index e025e65836..45c75890a3 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
@@ -30,6 +30,7 @@ import java.util.stream.Collectors;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.PlanStringBuilder;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
@@ -76,7 +77,8 @@ public class KafkaGroupScan extends AbstractGroupScan {
private final KafkaStoragePlugin kafkaStoragePlugin;
private final KafkaScanSpec kafkaScanSpec;
- private List<SchemaPath> columns;
+ private final List<SchemaPath> columns;
+ private final int records;
private ListMultimap<Integer, PartitionScanWork> assignments;
private List<EndpointAffinity> affinities;
@@ -86,18 +88,21 @@ public class KafkaGroupScan extends AbstractGroupScan {
public KafkaGroupScan(@JsonProperty("userName") String userName,
@JsonProperty("kafkaStoragePluginConfig") KafkaStoragePluginConfig kafkaStoragePluginConfig,
@JsonProperty("columns") List<SchemaPath> columns,
+ @JsonProperty("records") int records,
@JsonProperty("kafkaScanSpec") KafkaScanSpec scanSpec,
@JacksonInject StoragePluginRegistry pluginRegistry) throws ExecutionSetupException {
this(userName,
pluginRegistry.resolve(kafkaStoragePluginConfig, KafkaStoragePlugin.class),
columns,
+ records,
scanSpec);
}
- public KafkaGroupScan(KafkaStoragePlugin kafkaStoragePlugin, KafkaScanSpec kafkaScanSpec, List<SchemaPath> columns) {
+ public KafkaGroupScan(KafkaStoragePlugin kafkaStoragePlugin, KafkaScanSpec kafkaScanSpec, List<SchemaPath> columns, int records) {
super(StringUtils.EMPTY);
this.kafkaStoragePlugin = kafkaStoragePlugin;
this.columns = columns;
+ this.records = records;
this.kafkaScanSpec = kafkaScanSpec;
init();
}
@@ -105,10 +110,12 @@ public class KafkaGroupScan extends AbstractGroupScan {
public KafkaGroupScan(String userName,
KafkaStoragePlugin kafkaStoragePlugin,
List<SchemaPath> columns,
+ int records,
KafkaScanSpec kafkaScanSpec) {
super(userName);
this.kafkaStoragePlugin = kafkaStoragePlugin;
this.columns = columns;
+ this.records = records;
this.kafkaScanSpec = kafkaScanSpec;
init();
}
@@ -117,6 +124,27 @@ public class KafkaGroupScan extends AbstractGroupScan {
super(that);
this.kafkaStoragePlugin = that.kafkaStoragePlugin;
this.columns = that.columns;
+ this.records = that.records;
+ this.kafkaScanSpec = that.kafkaScanSpec;
+ this.assignments = that.assignments;
+ this.partitionWorkMap = that.partitionWorkMap;
+ }
+
+ public KafkaGroupScan(KafkaGroupScan that, List<SchemaPath> columns) {
+ super(that);
+ this.kafkaStoragePlugin = that.kafkaStoragePlugin;
+ this.columns = columns;
+ this.records = that.records;
+ this.kafkaScanSpec = that.kafkaScanSpec;
+ this.assignments = that.assignments;
+ this.partitionWorkMap = that.partitionWorkMap;
+ }
+
+ public KafkaGroupScan(KafkaGroupScan that, int records) {
+ super(that);
+ this.kafkaStoragePlugin = that.kafkaStoragePlugin;
+ this.columns = that.columns;
+ this.records = records;
this.kafkaScanSpec = that.kafkaScanSpec;
this.assignments = that.assignments;
this.partitionWorkMap = that.partitionWorkMap;
@@ -262,6 +290,20 @@ public class KafkaGroupScan extends AbstractGroupScan {
assignments = AssignmentCreator.getMappings(incomingEndpoints, Lists.newArrayList(partitionWorkMap.values()));
}
+ @Override
+ public GroupScan applyLimit(int maxRecords) {
+ if (maxRecords > records) { // pass the limit value into sub-scan
+ return new KafkaGroupScan(this, maxRecords);
+ } else { // stop the transform
+ return null;
+ }
+ }
+
+ @Override
+ public boolean supportsLimitPushdown() {
+ return true;
+ }
+
@Override
public KafkaSubScan getSpecificScan(int minorFragmentId) {
List<PartitionScanWork> workList = assignments.get(minorFragmentId);
@@ -270,7 +312,7 @@ public class KafkaGroupScan extends AbstractGroupScan {
.map(PartitionScanWork::getPartitionScanSpec)
.collect(Collectors.toList());
- return new KafkaSubScan(getUserName(), kafkaStoragePlugin, columns, scanSpecList);
+ return new KafkaSubScan(getUserName(), kafkaStoragePlugin, columns, records, scanSpecList);
}
@Override
@@ -314,9 +356,7 @@ public class KafkaGroupScan extends AbstractGroupScan {
@Override
public GroupScan clone(List<SchemaPath> columns) {
- KafkaGroupScan clone = new KafkaGroupScan(this);
- clone.columns = columns;
- return clone;
+ return new KafkaGroupScan(this, columns);
}
public GroupScan cloneWithNewSpec(List<KafkaPartitionScanSpec> partitionScanSpecList) {
@@ -347,6 +387,11 @@ public class KafkaGroupScan extends AbstractGroupScan {
return columns;
}
+ @JsonProperty
+ public int getRecords() {
+ return records;
+ }
+
@JsonProperty
public KafkaScanSpec getKafkaScanSpec() {
return kafkaScanSpec;
@@ -359,7 +404,11 @@ public class KafkaGroupScan extends AbstractGroupScan {
@Override
public String toString() {
- return String.format("KafkaGroupScan [KafkaScanSpec=%s, columns=%s]", kafkaScanSpec, columns);
+ return new PlanStringBuilder("")
+ .field("scanSpec", kafkaScanSpec)
+ .field("columns", columns)
+ .field("records", records)
+ .toString();
}
@JsonIgnore
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
index 551b62fed6..0d38ea19af 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
@@ -62,6 +62,7 @@ public class KafkaRecordReader implements ManagedReader<SchemaNegotiator> {
}
};
negotiator.setErrorContext(errorContext);
+ negotiator.limit(maxRecords);
messageReader = MessageReaderFactory.getMessageReader(readOptions.getMessageReader());
messageReader.init(negotiator, readOptions, plugin);
@@ -86,10 +87,6 @@ public class KafkaRecordReader implements ManagedReader<SchemaNegotiator> {
}
private boolean nextLine(RowSetLoader rowWriter) {
- if (rowWriter.limitReached(maxRecords)) {
- return false;
- }
-
if (currentOffset >= subScanSpec.getEndOffset() || !msgItr.hasNext()) {
return false;
}
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
index 7d2ebcc413..4adfdca26a 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
@@ -61,7 +61,7 @@ public class KafkaScanBatchCreator implements BatchCreator<KafkaSubScan> {
builder.setUserName(subScan.getUserName());
List<ManagedReader<SchemaNegotiator>> readers = subScan.getPartitionSubScanSpecList().stream()
- .map(scanSpec -> new KafkaRecordReader(scanSpec, options, subScan.getKafkaStoragePlugin(), -1))
+ .map(scanSpec -> new KafkaRecordReader(scanSpec, options, subScan.getKafkaStoragePlugin(), subScan.getRecords()))
.collect(Collectors.toList());
ManagedScanFramework.ReaderFactory readerFactory = new BasicScanFactory(readers.iterator());
builder.setReaderFactory(readerFactory);
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
index 257c0bf5a1..d80b959a04 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
@@ -76,7 +76,7 @@ public class KafkaStoragePlugin extends AbstractStoragePlugin {
KafkaScanSpec kafkaScanSpec = selection.getListWith(new ObjectMapper(),
new TypeReference<KafkaScanSpec>() {
});
- return new KafkaGroupScan(this, kafkaScanSpec, null);
+ return new KafkaGroupScan(this, kafkaScanSpec, null, -1);
}
public void registerToClose(AutoCloseable autoCloseable) {
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
index c55f1e7234..c05ddd01d7 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
@@ -44,6 +44,7 @@ public class KafkaSubScan extends AbstractBase implements SubScan {
private final KafkaStoragePlugin kafkaStoragePlugin;
private final List<SchemaPath> columns;
+ private final int records;
private final List<KafkaPartitionScanSpec> partitionSubScanSpecList;
@JsonCreator
@@ -51,21 +52,25 @@ public class KafkaSubScan extends AbstractBase implements SubScan {
@JsonProperty("userName") String userName,
@JsonProperty("kafkaStoragePluginConfig") KafkaStoragePluginConfig kafkaStoragePluginConfig,
@JsonProperty("columns") List<SchemaPath> columns,
+ @JsonProperty("records") int records,
@JsonProperty("partitionSubScanSpecList") LinkedList<KafkaPartitionScanSpec> partitionSubScanSpecList)
throws ExecutionSetupException {
this(userName,
registry.resolve(kafkaStoragePluginConfig, KafkaStoragePlugin.class),
columns,
+ records,
partitionSubScanSpecList);
}
public KafkaSubScan(String userName,
KafkaStoragePlugin kafkaStoragePlugin,
List<SchemaPath> columns,
+ int records,
List<KafkaPartitionScanSpec> partitionSubScanSpecList) {
super(userName);
this.kafkaStoragePlugin = kafkaStoragePlugin;
this.columns = columns;
+ this.records = records;
this.partitionSubScanSpecList = partitionSubScanSpecList;
}
@@ -77,7 +82,7 @@ public class KafkaSubScan extends AbstractBase implements SubScan {
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.isEmpty());
- return new KafkaSubScan(getUserName(), kafkaStoragePlugin, columns, partitionSubScanSpecList);
+ return new KafkaSubScan(getUserName(), kafkaStoragePlugin, columns, records, partitionSubScanSpecList);
}
@Override
@@ -95,6 +100,11 @@ public class KafkaSubScan extends AbstractBase implements SubScan {
return columns;
}
+ @JsonProperty
+ public int getRecords() {
+ return records;
+ }
+
@JsonProperty
public List<KafkaPartitionScanSpec> getPartitionSubScanSpecList() {
return partitionSubScanSpecList;
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
index e86423e4f0..e91303610f 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaQueriesTest.java
@@ -63,6 +63,16 @@ public class KafkaQueriesTest extends KafkaTestBase {
}
}
+ @Test
+ public void testResultLimit() throws Exception {
+ String queryString = String.format(TestQueryConstants.MSG_LIMIT_QUERY, TestQueryConstants.JSON_TOPIC);
+ queryBuilder()
+ .sql(queryString)
+ .planMatcher()
+ .include("Scan", "records=3")
+ .match();
+ }
+
@Test
public void testResultCount() {
String queryString = String.format(TestQueryConstants.MSG_SELECT_QUERY, TestQueryConstants.JSON_TOPIC);
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestQueryConstants.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestQueryConstants.java
index b3163adbcd..b370b9bd63 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestQueryConstants.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestQueryConstants.java
@@ -40,6 +40,7 @@ public interface TestQueryConstants {
// Queries
String MSG_COUNT_QUERY = "select count(*) from kafka.`%s`";
String MSG_SELECT_QUERY = "select * from kafka.`%s`";
+ String MSG_LIMIT_QUERY = "select * from kafka.`%s` limit 3";
String MIN_OFFSET_QUERY = "select MIN(kafkaMsgOffset) as minOffset from kafka.`%s`";
String MAX_OFFSET_QUERY = "select MAX(kafkaMsgOffset) as maxOffset from kafka.`%s`";