You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2018/07/05 10:24:11 UTC

[GitHub] asfgit closed pull request #1272: DRILL-5977: Filter Pushdown in Drill-Kafka plugin

asfgit closed pull request #1272: DRILL-5977: Filter Pushdown in Drill-Kafka plugin
URL: https://github.com/apache/drill/pull/1272
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
index 9cf575b345..976c82a0ff 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
@@ -19,10 +19,12 @@
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.collect.Sets;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.exceptions.UserException;
@@ -35,7 +37,6 @@
 import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
 import org.apache.drill.exec.store.schedule.AffinityCreator;
 import org.apache.drill.exec.store.schedule.AssignmentCreator;
 import org.apache.drill.exec.store.schedule.CompleteWork;
@@ -72,10 +73,11 @@
   private final KafkaScanSpec kafkaScanSpec;
 
   private List<SchemaPath> columns;
-  private List<PartitionScanWork> partitionWorkList;
   private ListMultimap<Integer, PartitionScanWork> assignments;
   private List<EndpointAffinity> affinities;
 
+  private Map<TopicPartition, PartitionScanWork> partitionWorkMap;
+
   @JsonCreator
   public KafkaGroupScan(@JsonProperty("userName") String userName,
                         @JsonProperty("kafkaStoragePluginConfig") KafkaStoragePluginConfig kafkaStoragePluginConfig,
@@ -112,34 +114,18 @@ public KafkaGroupScan(KafkaGroupScan that) {
     this.kafkaStoragePlugin = that.kafkaStoragePlugin;
     this.columns = that.columns;
     this.kafkaScanSpec = that.kafkaScanSpec;
-    this.partitionWorkList = that.partitionWorkList;
     this.assignments = that.assignments;
+    this.partitionWorkMap = that.partitionWorkMap;
   }
 
-  private static class PartitionScanWork implements CompleteWork {
-
-    private final EndpointByteMapImpl byteMap = new EndpointByteMapImpl();
-
-    private final TopicPartition topicPartition;
-    private final long beginOffset;
-    private final long latestOffset;
-
-    public PartitionScanWork(TopicPartition topicPartition, long beginOffset, long latestOffset) {
-      this.topicPartition = topicPartition;
-      this.beginOffset = beginOffset;
-      this.latestOffset = latestOffset;
-    }
-
-    public TopicPartition getTopicPartition() {
-      return topicPartition;
-    }
+  public static class PartitionScanWork implements CompleteWork {
 
-    public long getBeginOffset() {
-      return beginOffset;
-    }
+    private final EndpointByteMapImpl byteMap;
+    private final KafkaPartitionScanSpec partitionScanSpec;
 
-    public long getLatestOffset() {
-      return latestOffset;
+    public PartitionScanWork(EndpointByteMap byteMap, KafkaPartitionScanSpec partitionScanSpec) {
+      this.byteMap = (EndpointByteMapImpl)byteMap;
+      this.partitionScanSpec = partitionScanSpec;
     }
 
     @Override
@@ -149,7 +135,7 @@ public int compareTo(CompleteWork o) {
 
     @Override
     public long getTotalBytes() {
-      return (latestOffset - beginOffset) * MSG_SIZE;
+      return (partitionScanSpec.getEndOffset() - partitionScanSpec.getStartOffset()) * MSG_SIZE;
     }
 
     @Override
@@ -157,6 +143,9 @@ public EndpointByteMap getByteMap() {
       return byteMap;
     }
 
+    public KafkaPartitionScanSpec getPartitionScanSpec() {
+      return partitionScanSpec;
+    }
   }
 
   /**
@@ -164,7 +153,7 @@ public EndpointByteMap getByteMap() {
    * corresponding topicPartition
    */
   private void init() {
-    partitionWorkList = Lists.newArrayList();
+    partitionWorkMap = Maps.newHashMap();
     Collection<DrillbitEndpoint> endpoints = kafkaStoragePlugin.getContext().getBits();
     Map<String, DrillbitEndpoint> endpointMap = Maps.newHashMap();
     for (DrillbitEndpoint endpoint : endpoints) {
@@ -211,12 +200,13 @@ private void init() {
 
     // computes work for each end point
     for (PartitionInfo partitionInfo : topicPartitions) {
-      TopicPartition topicPartition = new TopicPartition(topicName, partitionInfo.partition());
+      TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
       long lastCommittedOffset = startOffsetsMap.get(topicPartition);
       long latestOffset = endOffsetsMap.get(topicPartition);
       logger.debug("Latest offset of {} is {}", topicPartition, latestOffset);
       logger.debug("Last committed offset of {} is {}", topicPartition, lastCommittedOffset);
-      PartitionScanWork work = new PartitionScanWork(topicPartition, lastCommittedOffset, latestOffset);
+      KafkaPartitionScanSpec partitionScanSpec = new KafkaPartitionScanSpec(topicPartition.topic(), topicPartition.partition(), lastCommittedOffset, latestOffset);
+      PartitionScanWork work = new PartitionScanWork(new EndpointByteMapImpl(), partitionScanSpec);
       Node[] inSyncReplicas = partitionInfo.inSyncReplicas();
       for (Node isr : inSyncReplicas) {
         String host = isr.host();
@@ -225,23 +215,22 @@ private void init() {
           work.getByteMap().add(ep, work.getTotalBytes());
         }
       }
-      partitionWorkList.add(work);
+      partitionWorkMap.put(topicPartition, work);
     }
   }
 
   @Override
   public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
-    assignments = AssignmentCreator.getMappings(incomingEndpoints, partitionWorkList);
+    assignments = AssignmentCreator.getMappings(incomingEndpoints, Lists.newArrayList(partitionWorkMap.values()));
   }
 
   @Override
   public KafkaSubScan getSpecificScan(int minorFragmentId) {
     List<PartitionScanWork> workList = assignments.get(minorFragmentId);
-    List<KafkaSubScanSpec> scanSpecList = Lists.newArrayList();
+    List<KafkaPartitionScanSpec> scanSpecList = Lists.newArrayList();
 
     for (PartitionScanWork work : workList) {
-      scanSpecList.add(new KafkaSubScanSpec(work.getTopicPartition().topic(), work.getTopicPartition().partition(),
-          work.getBeginOffset(), work.getLatestOffset()));
+      scanSpecList.add(work.partitionScanSpec);
     }
 
     return new KafkaSubScan(getUserName(), kafkaStoragePlugin, columns, scanSpecList);
@@ -249,14 +238,14 @@ public KafkaSubScan getSpecificScan(int minorFragmentId) {
 
   @Override
   public int getMaxParallelizationWidth() {
-    return partitionWorkList.size();
+    return partitionWorkMap.values().size();
   }
 
   @Override
   public ScanStats getScanStats() {
     long messageCount = 0;
-    for (PartitionScanWork work : partitionWorkList) {
-      messageCount += (work.getLatestOffset() - work.getBeginOffset());
+    for (PartitionScanWork work : partitionWorkMap.values()) {
+      messageCount += (work.getPartitionScanSpec().getEndOffset() - work.getPartitionScanSpec().getStartOffset());
     }
     return new ScanStats(GroupScanProperty.EXACT_ROW_COUNT, messageCount, 1, messageCount * MSG_SIZE);
   }
@@ -275,7 +264,7 @@ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) thro
   @Override
   public List<EndpointAffinity> getOperatorAffinity() {
     if (affinities == null) {
-      affinities = AffinityCreator.getAffinityMap(partitionWorkList);
+      affinities = AffinityCreator.getAffinityMap(Lists.newArrayList(partitionWorkMap.values()));
     }
     return affinities;
   }
@@ -293,6 +282,23 @@ public GroupScan clone(List<SchemaPath> columns) {
     return clone;
   }
 
+  public GroupScan cloneWithNewSpec(List<KafkaPartitionScanSpec> partitionScanSpecList) {
+    KafkaGroupScan clone = new KafkaGroupScan(this);
+    HashSet<TopicPartition> partitionsInSpec = Sets.newHashSet();
+
+    for(KafkaPartitionScanSpec scanSpec : partitionScanSpecList) {
+      TopicPartition tp = new TopicPartition(scanSpec.getTopicName(), scanSpec.getPartitionId());
+      partitionsInSpec.add(tp);
+
+      PartitionScanWork newScanWork = new PartitionScanWork(partitionWorkMap.get(tp).getByteMap(), scanSpec);
+      clone.partitionWorkMap.put(tp, newScanWork);
+    }
+
+    //Remove unnecessary partitions from partitionWorkMap
+    clone.partitionWorkMap.keySet().removeIf(tp -> !partitionsInSpec.contains(tp));
+    return clone;
+  }
+
   @JsonProperty
   public KafkaStoragePluginConfig getKafkaStoragePluginConfig() {
     return kafkaStoragePlugin.getConfig();
@@ -318,4 +324,12 @@ public String toString() {
     return String.format("KafkaGroupScan [KafkaScanSpec=%s, columns=%s]", kafkaScanSpec, columns);
   }
 
+  @JsonIgnore
+  public List<KafkaPartitionScanSpec> getPartitionScanSpecList() {
+    List<KafkaPartitionScanSpec> partitionScanSpecList = Lists.newArrayList();
+    for (PartitionScanWork work : partitionWorkMap.values()) {
+      partitionScanSpecList.add(work.partitionScanSpec.clone());
+    }
+    return partitionScanSpecList;
+  }
 }
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaNodeProcessor.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaNodeProcessor.java
new file mode 100644
index 0000000000..ba39b76b8f
--- /dev/null
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaNodeProcessor.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.ValueExpressions.BooleanExpression;
+import org.apache.drill.common.expression.ValueExpressions.DateExpression;
+import org.apache.drill.common.expression.ValueExpressions.DoubleExpression;
+import org.apache.drill.common.expression.ValueExpressions.FloatExpression;
+import org.apache.drill.common.expression.ValueExpressions.IntExpression;
+import org.apache.drill.common.expression.ValueExpressions.LongExpression;
+import org.apache.drill.common.expression.ValueExpressions.QuotedString;
+import org.apache.drill.common.expression.ValueExpressions.TimeExpression;
+import org.apache.drill.common.expression.ValueExpressions.TimeStampExpression;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+class KafkaNodeProcessor extends AbstractExprVisitor<Boolean, LogicalExpression, RuntimeException> {
+
+  private String functionName;
+  private Boolean success;
+  private Long value;
+  private String path;
+
+  public KafkaNodeProcessor(String functionName) {
+    this.functionName = functionName;
+    this.success = false;
+  }
+
+  public static boolean isPushdownFunction(String functionName) {
+    return COMPARE_FUNCTIONS_TRANSPOSE_MAP.keySet().contains(functionName);
+  }
+
+  @Override
+  public Boolean visitUnknown(LogicalExpression e, LogicalExpression valueArg) throws RuntimeException {
+    return false;
+  }
+
+  public static KafkaNodeProcessor process(FunctionCall call) {
+    String functionName = call.getName();
+    LogicalExpression nameArg = call.args.get(0);
+    LogicalExpression valueArg = call.args.size() >= 2? call.args.get(1) : null;
+    KafkaNodeProcessor evaluator = new KafkaNodeProcessor(functionName);
+
+    if (VALUE_EXPRESSION_CLASSES.contains(nameArg.getClass())) {
+      LogicalExpression swapArg = valueArg;
+      valueArg = nameArg;
+      nameArg = swapArg;
+      evaluator.functionName = COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName);
+    }
+    evaluator.success = nameArg.accept(evaluator, valueArg);
+    return evaluator;
+  }
+
+  public boolean isSuccess() {
+    // TODO Auto-generated method stub
+    return success;
+  }
+
+  public String getPath() {
+    return path;
+  }
+
+  public Long getValue() {
+    return value;
+  }
+
+  public String getFunctionName() {
+    return functionName;
+  }
+
+  @Override
+  public Boolean visitSchemaPath(SchemaPath path, LogicalExpression valueArg) throws RuntimeException {
+    this.path = path.getRootSegmentPath();
+
+    if(valueArg == null) {
+      return false;
+    }
+
+    switch (this.path) {
+      case "kafkaMsgOffset":
+        /*
+         * Do not pushdown "not_equal" on kafkaMsgOffset.
+         */
+        if(functionName.equals("not_equal")) {
+          return false;
+        }
+      case "kafkaPartitionId":
+        if(valueArg instanceof IntExpression) {
+          value = (long) ((IntExpression) valueArg).getInt();
+          return true;
+        }
+
+        if(valueArg instanceof LongExpression) {
+          value = ((LongExpression) valueArg).getLong();
+          return true;
+        }
+        break;
+      case "kafkaMsgTimestamp":
+        /*
+        Only pushdown "equal", "greater_than", "greater_than_or_equal" on kafkaMsgTimestamp
+         */
+        if(!functionName.equals("equal") && !functionName.equals("greater_than")
+               && !functionName.equals("greater_than_or_equal_to")) {
+          return false;
+        }
+
+        if(valueArg instanceof LongExpression) {
+          value = ((LongExpression) valueArg).getLong();
+          return true;
+        }
+
+        if (valueArg instanceof DateExpression) {
+          value = ((DateExpression)valueArg).getDate();
+          return true;
+        }
+
+        if (valueArg instanceof TimeExpression) {
+          value = (long) ((TimeExpression)valueArg).getTime();
+          return true;
+        }
+
+        if (valueArg instanceof TimeStampExpression) {
+          value = ((TimeStampExpression) valueArg).getTimeStamp();
+          return true;
+        }
+
+        if(valueArg instanceof IntExpression) {
+          value = (long) ((IntExpression) valueArg).getInt();
+          return true;
+        }
+        break;
+    }
+    return false;
+  }
+
+  private static final ImmutableSet<Class<? extends LogicalExpression>> VALUE_EXPRESSION_CLASSES;
+  static {
+    ImmutableSet.Builder<Class<? extends LogicalExpression>> builder = ImmutableSet.builder();
+    VALUE_EXPRESSION_CLASSES = builder
+                                   .add(BooleanExpression.class)
+                                   .add(DateExpression.class)
+                                   .add(DoubleExpression.class)
+                                   .add(FloatExpression.class)
+                                   .add(IntExpression.class)
+                                   .add(LongExpression.class)
+                                   .add(QuotedString.class)
+                                   .add(TimeExpression.class)
+                                   .build();
+  }
+
+  private static final ImmutableMap<String, String> COMPARE_FUNCTIONS_TRANSPOSE_MAP;
+  static {
+    ImmutableMap.Builder<String, String> builder = ImmutableMap.builder();
+    COMPARE_FUNCTIONS_TRANSPOSE_MAP = builder
+                                          .put("equal", "equal")
+                                          .put("not_equal", "not_equal")
+                                          .put("greater_than_or_equal_to", "less_than_or_equal_to")
+                                          .put("greater_than", "less_than")
+                                          .put("less_than_or_equal_to", "greater_than_or_equal_to")
+                                          .put("less_than", "greater_than")
+                                          .build();
+  }
+
+}
+
+
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpec.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpec.java
new file mode 100644
index 0000000000..713f62e9bb
--- /dev/null
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpec.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class KafkaPartitionScanSpec {
+  private String topicName;
+  private int partitionId;
+  private long startOffset;
+  private long endOffset;
+
+  @JsonCreator
+  public KafkaPartitionScanSpec(@JsonProperty("topicName") String topicName,
+                                @JsonProperty("partitionId") int partitionId,
+                                @JsonProperty("startOffset") long startOffset,
+                                @JsonProperty("endOffset") long endOffset) {
+    this.topicName = topicName;
+    this.partitionId = partitionId;
+    this.startOffset = startOffset;
+    this.endOffset = endOffset;
+  }
+
+  public String getTopicName() {
+    return topicName;
+  }
+
+  public int getPartitionId() {
+    return partitionId;
+  }
+
+  public long getStartOffset() {
+    return startOffset;
+  }
+
+  public long getEndOffset() {
+    return endOffset;
+  }
+
+  public void mergeScanSpec(String functionName, KafkaPartitionScanSpec scanSpec) {
+    switch (functionName) {
+      case "booleanAnd":
+        //Reduce the scan range
+        if(startOffset < scanSpec.startOffset) {
+          startOffset = scanSpec.startOffset;
+        }
+
+        if(endOffset > scanSpec.endOffset) {
+          endOffset = scanSpec.endOffset;
+        }
+        break;
+      case "booleanOr":
+        //Increase the scan range
+        if(scanSpec.startOffset < startOffset) {
+          startOffset = scanSpec.startOffset;
+        }
+
+        if(scanSpec.endOffset > endOffset) {
+          endOffset = scanSpec.endOffset;
+        }
+        break;
+    }
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if(obj instanceof KafkaPartitionScanSpec) {
+      KafkaPartitionScanSpec that = ((KafkaPartitionScanSpec)obj);
+      return this.topicName.equals(that.topicName) && this.partitionId == that.partitionId
+                 && this.startOffset == that.startOffset && this.endOffset == that.endOffset;
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return "KafkaPartitionScanSpec [topicName=" + topicName + ", partitionId=" + partitionId + ", startOffset="
+               + startOffset + ", endOffset=" + endOffset + "]";
+  }
+
+  public KafkaPartitionScanSpec clone() {
+    return new KafkaPartitionScanSpec(topicName, partitionId, startOffset, endOffset);
+  }
+}
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpecBuilder.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpecBuilder.java
new file mode 100644
index 0000000000..b52ed44208
--- /dev/null
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPartitionScanSpecBuilder.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.drill.common.expression.BooleanOperator;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+public class KafkaPartitionScanSpecBuilder extends
+    AbstractExprVisitor<List<KafkaPartitionScanSpec>,Void,RuntimeException> {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KafkaPartitionScanSpecBuilder.class);
+  private final LogicalExpression le;
+  private final KafkaGroupScan groupScan;
+  private final KafkaConsumer<? ,?> kafkaConsumer;
+  private ImmutableMap<TopicPartition, KafkaPartitionScanSpec> fullScanSpec;
+  private static final long CLOSE_TIMEOUT_MS = 200;
+
+  public KafkaPartitionScanSpecBuilder(KafkaGroupScan groupScan, LogicalExpression conditionExp) {
+    this.groupScan = groupScan;
+    kafkaConsumer = new KafkaConsumer<>(groupScan.getKafkaStoragePluginConfig().getKafkaConsumerProps(),
+        new ByteArrayDeserializer(), new ByteArrayDeserializer());
+    le = conditionExp;
+  }
+
+  public List<KafkaPartitionScanSpec> parseTree() {
+    ImmutableMap.Builder<TopicPartition, KafkaPartitionScanSpec> builder = ImmutableMap.builder();
+    for(KafkaPartitionScanSpec scanSpec : groupScan.getPartitionScanSpecList()) {
+      builder.put(new TopicPartition(scanSpec.getTopicName(), scanSpec.getPartitionId()), scanSpec);
+    }
+    fullScanSpec = builder.build();
+    List<KafkaPartitionScanSpec> pushdownSpec = le.accept(this, null);
+
+    /*
+    Non-existing / invalid partitions may result in empty scan spec.
+    This results in a "ScanBatch" with no reader. DRILL currently requires
+    at least one reader to be present in a scan batch.
+     */
+    if(pushdownSpec != null && pushdownSpec.isEmpty()) {
+      TopicPartition firstPartition = new TopicPartition(groupScan.getKafkaScanSpec().getTopicName(), 0);
+      KafkaPartitionScanSpec emptySpec =
+          new KafkaPartitionScanSpec(firstPartition.topic(),firstPartition.partition(),
+              fullScanSpec.get(firstPartition).getEndOffset(), fullScanSpec.get(firstPartition).getEndOffset());
+      pushdownSpec.add(emptySpec);
+    }
+    return pushdownSpec;
+  }
+
+  @Override
+  public List<KafkaPartitionScanSpec> visitUnknown(LogicalExpression e, Void value)
+      throws RuntimeException {
+    return null;
+  }
+
+  @Override
+  public List<KafkaPartitionScanSpec> visitBooleanOperator(BooleanOperator op, Void value)
+      throws RuntimeException {
+
+    Map<TopicPartition, KafkaPartitionScanSpec> specMap = Maps.newHashMap();
+    ImmutableList<LogicalExpression> args = op.args;
+    if(op.getName().equals("booleanOr")) {
+
+      for(LogicalExpression expr : args) {
+        List<KafkaPartitionScanSpec> parsedSpec = expr.accept(this, null);
+        //parsedSpec is null if expression cannot be pushed down
+        if(parsedSpec != null) {
+          for(KafkaPartitionScanSpec newSpec : parsedSpec) {
+            TopicPartition tp = new TopicPartition(newSpec.getTopicName(), newSpec.getPartitionId());
+            KafkaPartitionScanSpec existingSpec = specMap.get(tp);
+
+            //If existing spec does not contain topic-partition
+            if(existingSpec == null) {
+              specMap.put(tp, newSpec); //Add topic-partition to spec for OR
+            } else {
+              existingSpec.mergeScanSpec(op.getName(), newSpec);
+              specMap.put(tp, existingSpec);
+            }
+          }
+        } else {
+          return null; //At any level, all arguments of booleanOr should support pushdown, else return null
+        }
+      }
+    } else { //booleanAnd
+      specMap.putAll(fullScanSpec);
+      for(LogicalExpression expr : args) {
+        List<KafkaPartitionScanSpec> parsedSpec = expr.accept(this, null);
+
+        //parsedSpec is null if expression cannot be pushed down
+        if(parsedSpec != null) {
+          Set<TopicPartition> partitionsInNewSpec = Sets.newHashSet(); //Store topic-partitions returned from new spec.
+
+          for (KafkaPartitionScanSpec newSpec : parsedSpec) {
+            TopicPartition tp = new TopicPartition(newSpec.getTopicName(), newSpec.getPartitionId());
+            partitionsInNewSpec.add(tp);
+            KafkaPartitionScanSpec existingSpec = specMap.get(tp);
+
+            if (existingSpec != null) {
+              existingSpec.mergeScanSpec(op.getName(), newSpec);
+              specMap.put(tp, existingSpec);
+            }
+          }
+
+          /*
+          For "booleanAnd", handle the case where condition is on `kafkaPartitionId`.
+          In this case, we would not want unnecessarily scan all the topic-partitions.
+          Hence we remove the unnecessary topic-partitions from the spec.
+         */
+          specMap.keySet().removeIf(partition -> !partitionsInNewSpec.contains(partition));
+        }
+
+      }
+    }
+    return Lists.newArrayList(specMap.values());
+  }
+
+  @Override
+  public List<KafkaPartitionScanSpec> visitFunctionCall(FunctionCall call, Void value)
+      throws RuntimeException {
+
+    String functionName = call.getName();
+    if(KafkaNodeProcessor.isPushdownFunction(functionName)) {
+
+      KafkaNodeProcessor kafkaNodeProcessor = KafkaNodeProcessor.process(call);
+      if(kafkaNodeProcessor.isSuccess()) {
+        switch (kafkaNodeProcessor.getPath()) {
+          case "kafkaMsgTimestamp":
+            return createScanSpecForTimestamp(kafkaNodeProcessor.getFunctionName(),
+                kafkaNodeProcessor.getValue());
+          case "kafkaMsgOffset":
+            return createScanSpecForOffset(kafkaNodeProcessor.getFunctionName(),
+                kafkaNodeProcessor.getValue());
+          case "kafkaPartitionId":
+            return createScanSpecForPartition(kafkaNodeProcessor.getFunctionName(),
+                kafkaNodeProcessor.getValue());
+        }
+      }
+    }
+    return null; //Return null, do not pushdown
+  }
+
+
+  private List<KafkaPartitionScanSpec> createScanSpecForTimestamp(String functionName,
+                                                                  Long fieldValue) {
+    List<KafkaPartitionScanSpec> scanSpec = Lists.newArrayList();
+    Map<TopicPartition, Long> timesValMap = Maps.newHashMap();
+    ImmutableSet<TopicPartition> topicPartitions = fullScanSpec.keySet();
+
+    for(TopicPartition partitions : topicPartitions) {
+      timesValMap.put(partitions, functionName.equals("greater_than") ? fieldValue+1 : fieldValue);
+    }
+
+    Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestamp = kafkaConsumer.offsetsForTimes(timesValMap);
+
+    for(TopicPartition tp : topicPartitions) {
+      OffsetAndTimestamp value = offsetAndTimestamp.get(tp);
+      //OffsetAndTimestamp is null if there is no offset greater or equal to requested timestamp
+      if(value == null) {
+        scanSpec.add(
+            new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
+                fullScanSpec.get(tp).getEndOffset(), fullScanSpec.get(tp).getEndOffset()));
+      } else {
+        scanSpec.add(
+            new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
+                value.offset(), fullScanSpec.get(tp).getEndOffset()));
+      }
+    }
+
+    return scanSpec;
+  }
+
+  private List<KafkaPartitionScanSpec> createScanSpecForOffset(String functionName,
+                                                               Long fieldValue) {
+    List<KafkaPartitionScanSpec> scanSpec = Lists.newArrayList();
+    ImmutableSet<TopicPartition> topicPartitions = fullScanSpec.keySet();
+
+    /*
+    We should handle the case where the specified offset does not exist in the current context,
+    i.e., fieldValue < startOffset or fieldValue > endOffset in a particular topic-partition.
+    Else, KafkaConsumer.poll will throw "TimeoutException".
+    */
+
+    switch (functionName) {
+      case "equal":
+        for(TopicPartition tp : topicPartitions) {
+          if(fieldValue < fullScanSpec.get(tp).getStartOffset()) {
+            //Offset does not exist
+            scanSpec.add(
+                new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
+                    fullScanSpec.get(tp).getEndOffset(), fullScanSpec.get(tp).getEndOffset()));
+          } else {
+            long val = Math.min(fieldValue, fullScanSpec.get(tp).getEndOffset());
+            long nextVal = Math.min(val+1, fullScanSpec.get(tp).getEndOffset());
+            scanSpec.add(new KafkaPartitionScanSpec(tp.topic(), tp.partition(), val, nextVal));
+          }
+        }
+        break;
+      case "greater_than_or_equal_to":
+        for(TopicPartition tp : topicPartitions) {
+          //Ensure scan range is between startOffset and endOffset,
+          long val = bindOffsetToRange(tp, fieldValue);
+          scanSpec.add(
+              new KafkaPartitionScanSpec(tp.topic(), tp.partition(), val,
+                  fullScanSpec.get(tp).getEndOffset()));
+        }
+        break;
+      case "greater_than":
+        for(TopicPartition tp : topicPartitions) {
+          //Ensure scan range is between startOffset and endOffset,
+          long val = bindOffsetToRange(tp, fieldValue+1);
+          scanSpec.add(
+              new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
+                  val, fullScanSpec.get(tp).getEndOffset()));
+        }
+        break;
+      case "less_than_or_equal_to":
+        for(TopicPartition tp : topicPartitions) {
+          //Ensure scan range is between startOffset and endOffset,
+          long val = bindOffsetToRange(tp, fieldValue+1);
+
+          scanSpec.add(
+              new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
+                  fullScanSpec.get(tp).getStartOffset(), val));
+        }
+        break;
+      case "less_than":
+        for(TopicPartition tp : topicPartitions) {
+          //Ensure scan range is between startOffset and endOffset,
+          long val = bindOffsetToRange(tp, fieldValue);
+
+          scanSpec.add(
+              new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
+                  fullScanSpec.get(tp).getStartOffset(), val));
+        }
+        break;
+    }
+    return scanSpec;
+  }
+
+  private List<KafkaPartitionScanSpec> createScanSpecForPartition(String functionName,
+                                                                  Long fieldValue) {
+    List<KafkaPartitionScanSpec> scanSpecList = Lists.newArrayList();
+    ImmutableSet<TopicPartition> topicPartitions = fullScanSpec.keySet();
+
+    switch (functionName) {
+      case "equal":
+        for(TopicPartition tp : topicPartitions) {
+          if(tp.partition() == fieldValue) {
+            scanSpecList.add(
+                new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
+                    fullScanSpec.get(tp).getStartOffset(),
+                    fullScanSpec.get(tp).getEndOffset()));
+          }
+        }
+        break;
+      case "not_equal":
+        for(TopicPartition tp : topicPartitions) {
+          if(tp.partition() != fieldValue) {
+            scanSpecList.add(
+                new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
+                    fullScanSpec.get(tp).getStartOffset(),
+                    fullScanSpec.get(tp).getEndOffset()));
+          }
+        }
+        break;
+      case "greater_than_or_equal_to":
+        for(TopicPartition tp : topicPartitions) {
+          if(tp.partition() >= fieldValue) {
+            scanSpecList.add(
+                new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
+                    fullScanSpec.get(tp).getStartOffset(),
+                    fullScanSpec.get(tp).getEndOffset()));
+          }
+        }
+        break;
+      case "greater_than":
+        for(TopicPartition tp : topicPartitions) {
+          if(tp.partition() > fieldValue) {
+            scanSpecList.add(
+                new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
+                    fullScanSpec.get(tp).getStartOffset(),
+                    fullScanSpec.get(tp).getEndOffset()));
+          }
+        }
+        break;
+      case "less_than_or_equal_to":
+        for(TopicPartition tp : topicPartitions) {
+          if(tp.partition() <= fieldValue) {
+            scanSpecList.add(
+                new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
+                    fullScanSpec.get(tp).getStartOffset(),
+                    fullScanSpec.get(tp).getEndOffset()));
+          }
+        }
+        break;
+      case "less_than":
+        for(TopicPartition tp : topicPartitions) {
+          if(tp.partition() < fieldValue) {
+            scanSpecList.add(
+                new KafkaPartitionScanSpec(tp.topic(), tp.partition(),
+                    fullScanSpec.get(tp).getStartOffset(),
+                    fullScanSpec.get(tp).getEndOffset()));
+          }
+        }
+        break;
+    }
+    return scanSpecList;
+  }
+
+  void close() {
+    kafkaConsumer.close(CLOSE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+  }
+
+  private long bindOffsetToRange(TopicPartition tp, long offset) {
+    return Math.max(fullScanSpec.get(tp).getStartOffset(), Math.min(offset, fullScanSpec.get(tp).getEndOffset()));
+  }
+}
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPushDownFilterIntoScan.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPushDownFilterIntoScan.java
new file mode 100644
index 0000000000..bf11f852c9
--- /dev/null
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaPushDownFilterIntoScan.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.logical.DrillOptiq;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.FilterPrel;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import java.util.List;
+
+public class KafkaPushDownFilterIntoScan extends StoragePluginOptimizerRule {
+  static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(KafkaPushDownFilterIntoScan.class);
+
+  public static final StoragePluginOptimizerRule INSTANCE =
+      new KafkaPushDownFilterIntoScan(RelOptHelper.some(FilterPrel.class, RelOptHelper.any(ScanPrel.class)),
+          "KafkaPushFilterIntoScan:Filter_On_Scan");
+
+  private KafkaPushDownFilterIntoScan(RelOptRuleOperand operand, String description) {
+    super(operand, description);
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall call) {
+    final ScanPrel scan = call.rel(1);
+    final FilterPrel filter = call.rel(0);
+    final RexNode condition = filter.getCondition();
+
+    LogicalExpression conditionExp =
+        DrillOptiq.toDrill(new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, condition);
+
+    KafkaGroupScan groupScan = (KafkaGroupScan) scan.getGroupScan();
+    logger.info("Partitions ScanSpec before pushdown: " + groupScan.getPartitionScanSpecList());
+    KafkaPartitionScanSpecBuilder builder = new KafkaPartitionScanSpecBuilder(groupScan, conditionExp);
+    List<KafkaPartitionScanSpec> newScanSpec = null;
+    newScanSpec = builder.parseTree();
+    builder.close(); //Close consumer
+
+    //No pushdown
+    if(newScanSpec == null) {
+      return;
+    }
+
+    logger.info("Partitions ScanSpec after pushdown: " + newScanSpec);
+    GroupScan newGroupScan = groupScan.cloneWithNewSpec(newScanSpec);
+    final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(), newGroupScan, scan.getRowType());
+    call.transformTo(filter.copy(filter.getTraitSet(), ImmutableList.of(newScanPrel)));
+  }
+
+  @Override
+  public boolean matches(RelOptRuleCall call) {
+    final ScanPrel scan = (ScanPrel) call.rel(1);
+    if (scan.getGroupScan() instanceof KafkaGroupScan) {
+      return super.matches(call);
+    }
+    return false;
+  }
+}
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
index b1cf9cd9b6..a0fc1f1117 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
@@ -31,7 +31,6 @@
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.AbstractRecordReader;
-import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
 import org.apache.drill.exec.store.kafka.decoders.MessageReader;
 import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
 import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
@@ -52,7 +51,7 @@
 
   private final boolean unionEnabled;
   private final KafkaStoragePlugin plugin;
-  private final KafkaSubScanSpec subScanSpec;
+  private final KafkaPartitionScanSpec subScanSpec;
   private final long kafkaPollTimeOut;
 
   private long currentOffset;
@@ -62,7 +61,7 @@
   private final boolean readNumbersAsDouble;
   private final String kafkaMsgReader;
 
-  public KafkaRecordReader(KafkaSubScan.KafkaSubScanSpec subScanSpec, List<SchemaPath> projectedColumns,
+  public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec, List<SchemaPath> projectedColumns,
       FragmentContext context, KafkaStoragePlugin plugin) {
     setColumns(projectedColumns);
     final OptionManager optionManager = context.getOptions();
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
index 9bedbd5c07..ae78d8c37c 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
@@ -44,7 +44,7 @@ public CloseableRecordBatch getBatch(ExecutorFragmentContext context, KafkaSubSc
     List<SchemaPath> columns = subScan.getColumns() != null ? subScan.getColumns() : GroupScan.ALL_COLUMNS;
 
     List<RecordReader> readers = new LinkedList<>();
-    for (KafkaSubScan.KafkaSubScanSpec scanSpec : subScan.getPartitionSubScanSpecList()) {
+    for (KafkaPartitionScanSpec scanSpec : subScan.getPartitionSubScanSpecList()) {
       readers.add(new KafkaRecordReader(scanSpec, columns, context, subScan.getKafkaStoragePlugin()));
     }
 
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
index 157e3673ea..4ca91ec473 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
@@ -71,7 +71,7 @@ public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) throws
 
   @Override
   public Set<StoragePluginOptimizerRule> getPhysicalOptimizerRules(OptimizerRulesContext optimizerRulesContext) {
-    return ImmutableSet.of();
+    return ImmutableSet.of(KafkaPushDownFilterIntoScan.INSTANCE);
   }
 
   @Override
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
index 468f766a96..d62faa60cc 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
@@ -43,14 +43,14 @@
 
   private final KafkaStoragePlugin kafkaStoragePlugin;
   private final List<SchemaPath> columns;
-  private final List<KafkaSubScanSpec> partitionSubScanSpecList;
+  private final List<KafkaPartitionScanSpec> partitionSubScanSpecList;
 
   @JsonCreator
   public KafkaSubScan(@JacksonInject StoragePluginRegistry registry,
                       @JsonProperty("userName") String userName,
                       @JsonProperty("kafkaStoragePluginConfig") KafkaStoragePluginConfig kafkaStoragePluginConfig,
                       @JsonProperty("columns") List<SchemaPath> columns,
-                      @JsonProperty("partitionSubScanSpecList") LinkedList<KafkaSubScanSpec> partitionSubScanSpecList)
+                      @JsonProperty("partitionSubScanSpecList") LinkedList<KafkaPartitionScanSpec> partitionSubScanSpecList)
       throws ExecutionSetupException {
     this(userName,
         (KafkaStoragePlugin) registry.getPlugin(kafkaStoragePluginConfig),
@@ -61,7 +61,7 @@ public KafkaSubScan(@JacksonInject StoragePluginRegistry registry,
   public KafkaSubScan(String userName,
                       KafkaStoragePlugin kafkaStoragePlugin,
                       List<SchemaPath> columns,
-                      List<KafkaSubScanSpec> partitionSubScanSpecList) {
+                      List<KafkaPartitionScanSpec> partitionSubScanSpecList) {
     super(userName);
     this.kafkaStoragePlugin = kafkaStoragePlugin;
     this.columns = columns;
@@ -95,7 +95,7 @@ public KafkaStoragePluginConfig getKafkaStoragePluginConfig() {
   }
 
   @JsonProperty
-  public List<KafkaSubScanSpec> getPartitionSubScanSpecList() {
+  public List<KafkaPartitionScanSpec> getPartitionSubScanSpecList() {
     return partitionSubScanSpecList;
   }
 
@@ -108,68 +108,4 @@ public KafkaStoragePlugin getKafkaStoragePlugin() {
   public int getOperatorType() {
     return CoreOperatorType.KAFKA_SUB_SCAN_VALUE;
   }
-
-  public static class KafkaSubScanSpec {
-    protected String topicName;
-    protected int partitionId;
-    protected long startOffset;
-    protected long endOffset;
-
-    @JsonCreator
-    public KafkaSubScanSpec(@JsonProperty("topicName") String topicName, @JsonProperty("partitionId") int partitionId,
-        @JsonProperty("startOffset") long startOffset, @JsonProperty("endOffset") long endOffset) {
-      this.topicName = topicName;
-      this.partitionId = partitionId;
-      this.startOffset = startOffset;
-      this.endOffset = endOffset;
-    }
-
-    KafkaSubScanSpec() {
-
-    }
-
-    public String getTopicName() {
-      return topicName;
-    }
-
-    public int getPartitionId() {
-      return partitionId;
-    }
-
-    public long getStartOffset() {
-      return startOffset;
-    }
-
-    public long getEndOffset() {
-      return endOffset;
-    }
-
-    public KafkaSubScanSpec setTopicName(String topicName) {
-      this.topicName = topicName;
-      return this;
-    }
-
-    public KafkaSubScanSpec setPartitionId(int partitionId) {
-      this.partitionId = partitionId;
-      return this;
-    }
-
-    public KafkaSubScanSpec setStartOffset(long startOffset) {
-      this.startOffset = startOffset;
-      return this;
-    }
-
-    public KafkaSubScanSpec setEndOffset(long endOffset) {
-      this.endOffset = endOffset;
-      return this;
-    }
-
-    @Override
-    public String toString() {
-      return "KafkaSubScanSpec [topicName=" + topicName + ", partitionId=" + partitionId + ", startOffset="
-          + startOffset + ", endOffset=" + endOffset + "]";
-    }
-
-  }
-
 }
diff --git a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java
index 3afb1b8d45..1c814f6843 100644
--- a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java
+++ b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java
@@ -23,7 +23,6 @@
 
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -46,7 +45,7 @@
   private final long kafkaPollTimeOut;
   private final long endOffset;
 
-  public MessageIterator(final KafkaConsumer<byte[], byte[]> kafkaConsumer, final KafkaSubScanSpec subScanSpec,
+  public MessageIterator(final KafkaConsumer<byte[], byte[]> kafkaConsumer, final KafkaPartitionScanSpec subScanSpec,
       final long kafkaPollTimeOut) {
     this.kafkaConsumer = kafkaConsumer;
     this.kafkaPollTimeOut = kafkaPollTimeOut;
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java
new file mode 100644
index 0000000000..7be0ec3941
--- /dev/null
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import org.apache.drill.categories.KafkaStorageTest;
+import org.apache.drill.categories.SlowTest;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.apache.drill.exec.store.kafka.TestKafkaSuit.NUM_JSON_MSG;
+import static org.apache.drill.exec.store.kafka.TestKafkaSuit.embeddedKafkaCluster;
+
+@Category({KafkaStorageTest.class, SlowTest.class})
+public class KafkaFilterPushdownTest extends KafkaTestBase {
+  private static final int NUM_PARTITIONS = 5;
+  private static final String expectedSubStr = "    \"kafkaScanSpec\" : {\n" +
+                                                   "      \"topicName\" : \"drill-pushdown-topic\"\n" +
+                                                   "    },\n" +
+                                                   "    \"cost\" : %s.0";
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    TestKafkaSuit.createTopicHelper(TestQueryConstants.JSON_PUSHDOWN_TOPIC, NUM_PARTITIONS);
+    KafkaMessageGenerator generator = new KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(),
+        StringSerializer.class);
+    generator.populateJsonMsgWithTimestamps(TestQueryConstants.JSON_PUSHDOWN_TOPIC, NUM_JSON_MSG);
+  }
+
+  /**
+   * Test filter pushdown with condition on kafkaMsgOffset.
+   * @throws Exception
+   */
+  @Test
+  public void testPushdownOnOffset() throws Exception {
+    final String predicate1 = "kafkaMsgOffset > 4";
+    final String predicate2 = "kafkaMsgOffset < 6";
+    final int expectedRowCount = 5; //1 * NUM_PARTITIONS
+
+    final String queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_AND,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);
+
+    runKafkaSQLVerifyCount(queryString, expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
+  }
+
+  /**
+   * Test filter pushdown with condition on kafkaPartitionId.
+   * @throws Exception
+   */
+  @Test
+  public void testPushdownOnPartition() throws Exception {
+    final String predicate = "kafkaPartitionId = 1";
+    final int expectedRowCount = NUM_JSON_MSG;
+
+    final String queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);
+
+    runKafkaSQLVerifyCount(queryString, expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
+  }
+
+  /**
+   * Test filter pushdown with condition on kafkaPartitionId.
+   * @throws Exception
+   */
+  @Test
+  public void testPushdownOnTimestamp() throws Exception {
+    final String predicate = "kafkaMsgTimestamp > 6";
+    final int expectedRowCount = 20;
+
+    final String queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);
+
+    runKafkaSQLVerifyCount(queryString,expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
+  }
+
+  /**
+   * Test filter pushdown when timestamp is not ordered.
+   * @throws Exception
+   */
+  @Test
+  public void testPushdownUnorderedTimestamp() throws Exception {
+    final String predicate = "kafkaMsgTimestamp = 1";
+    final int expectedRowInPlan = 50;
+    final int expectedRowCount = 5;
+
+    final String queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);
+
+    runKafkaSQLVerifyCount(queryString,expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowInPlan));
+  }
+
+  /**
+   * Test filter pushdown when timestamp value specified does not exist.
+   * @throws Exception
+   */
+  @Test
+  public void testPushdownWhenTimestampDoesNotExist() throws Exception {
+    final String predicate = "kafkaMsgTimestamp = 20"; //20 does not exist
+    final int expectedRowCount = 0;
+
+    final String queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);
+
+    runKafkaSQLVerifyCount(queryString,expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
+  }
+
+  /**
+   * Test filter pushdown when partition value specified does not exist.
+   * @throws Exception
+   */
+  @Test
+  public void testPushdownWhenPartitionDoesNotExist() throws Exception {
+    final String predicate = "kafkaPartitionId = 100"; //100 does not exist
+    final int expectedRowCount = 0;
+
+    final String queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate);
+
+    runKafkaSQLVerifyCount(queryString,expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
+  }
+
+  /**
+   * Test filter pushdown when timestamp exist but partition does not exist.
+   * @throws Exception
+   */
+  @Test
+  public void testPushdownForEmptyScanSpec() throws Exception {
+    final String predicate1 = "kafkaMsgTimestamp > 6";
+    final String predicate2 = "kafkaPartitionId = 100";
+    final int expectedRowCount = 0;
+
+    final String queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_AND,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);
+
+    runKafkaSQLVerifyCount(queryString,expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
+  }
+
+  /**
+   * Test filter pushdown on kafkaMsgOffset with boundary conditions.
+   * In every case, the number of records returned is 0.
+   * @throws Exception
+   */
+  @Test
+  public void testPushdownOffsetNoRecordsReturnedWithBoundaryConditions() throws Exception {
+    final int expectedRowCount = 0;
+
+    //"equal" such that value = endOffset
+    String queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = 10");
+
+    runKafkaSQLVerifyCount(queryString,expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
+
+    //"equal" such that value < startOffset
+    queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = -1");
+
+    runKafkaSQLVerifyCount(queryString,expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
+
+    //"greater_than" such that value = endOffset-1
+    queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset > 9");
+
+    runKafkaSQLVerifyCount(queryString,expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
+
+    //"greater_than_or_equal" such that value = endOffset
+    queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset >= 10");
+
+    runKafkaSQLVerifyCount(queryString,expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
+
+    //"less_than" such that value = startOffset
+    queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset < 0");
+
+    runKafkaSQLVerifyCount(queryString,expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
+
+    //"less_than_or_equal" such that value < startOffset
+    queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset <= -1");
+
+    runKafkaSQLVerifyCount(queryString,expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
+  }
+
+  /**
+   * Test filter pushdown on kafkaMsgOffset with boundary conditions.
+   * In every case, the number of records returned is 5 (1 per topic-partition).
+   * @throws Exception
+   */
+  @Test
+  public void testPushdownOffsetOneRecordReturnedWithBoundaryConditions() throws Exception {
+    final int expectedRowCount = 5;
+
+    //"equal" such that value = endOffset-1
+    String queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset = 9");
+
+    runKafkaSQLVerifyCount(queryString, expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
+
+    //"greater_than" such that value = endOffset-2
+    queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset > 8");
+
+    runKafkaSQLVerifyCount(queryString,expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
+
+    //"greater_than_or_equal" such that value = endOffset-1
+    queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_BASIC,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, "kafkaMsgOffset >= 9");
+
+    runKafkaSQLVerifyCount(queryString,expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
+  }
+
+  /**
+   * Test filter pushdown with OR.
+   * Pushdown is possible if all the predicates are on metadata fields.
+   * @throws Exception
+   */
+  @Test
+  public void testPushdownWithOr() throws Exception {
+    final String predicate1 = "kafkaMsgTimestamp > 6";
+    final String predicate2 = "kafkaPartitionId = 1";
+    final int expectedRowCount = 26;
+
+    final String queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_OR,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);
+
+    runKafkaSQLVerifyCount(queryString,expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
+  }
+
+  /**
+   * Test filter pushdown with OR on kafkaMsgTimestamp and kafkaMsgOffset.
+   * @throws Exception
+   */
+  @Test
+  public void testPushdownWithOr1() throws Exception {
+    final String predicate1 = "kafkaMsgTimestamp = 6";
+    final String predicate2 = "kafkaMsgOffset = 6";
+    final int expectedRowInPlan = 25; //startOff=5, endOff=9
+    final int expectedRowCount = 10;
+
+    final String queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_OR,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);
+
+    runKafkaSQLVerifyCount(queryString,expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowInPlan));
+  }
+
+  /**
+   * Test pushdown for a combination of AND and OR.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testPushdownWithAndOrCombo() throws Exception {
+    final String predicate1 = "kafkaMsgTimestamp > 6";
+    final String predicate2 = "kafkaPartitionId = 1";
+    final String predicate3 = "kafkaPartitionId = 2";
+    final int expectedRowCount = 8;
+
+    final String queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_AND_OR_PATTERN_1,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2, predicate3);
+
+    runKafkaSQLVerifyCount(queryString,expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCount));
+  }
+
+  /**
+   * Test pushdown for a combination of AND and OR.
+   * @throws Exception
+   */
+  @Test
+  public void testPushdownWithAndOrCombo2() throws Exception {
+    final String predicate1 = "kafkaMsgTimestamp = 6";
+    final String predicate2 = "kafkaMsgOffset = 6";
+    final String predicate3 = "kafkaPartitionId = 1";
+    final String predicate4 = "kafkaPartitionId = 2";
+    final int expectedRowCountInPlan = 10; //startOff=5, endOff=9 for 2 partitions
+    final int expectedRowCount = 4;
+
+    final String queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_AND_OR_PATTERN_3,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2, predicate3, predicate4);
+
+    runKafkaSQLVerifyCount(queryString,expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCountInPlan));
+  }
+
+  /**
+   * Test pushdown for predicate1 AND predicate2.
+   * Where predicate1 is on metadata field and and predicate2 is on user fields.
+   * @throws Exception
+   */
+  @Test
+  public void testPushdownTimestampWithNonMetaField() throws Exception {
+    final String predicate1 = "kafkaMsgTimestamp > 6";
+    final String predicate2 = "boolKey = true";
+    final int expectedRowCountInPlan = 20; //startOff=5, endOff=9
+    final int expectedRowCount = 10;
+
+    final String queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_AND,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2);
+
+    runKafkaSQLVerifyCount(queryString,expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCountInPlan));
+  }
+
+  /**
+   * Tests that pushdown does not happen for predicates such as
+   * non-metadata-field = val1 OR (kafkaMsgTimestamp > val2 AND kafkaMsgTimestamp < val4)
+   * @throws Exception
+   */
+  @Test
+  public void testNoPushdownOfOffsetWithNonMetadataField() throws Exception {
+    final String predicate1 = "boolKey = true";
+    final String predicate2 = "kafkaMsgTimestamp > 6";
+    final String predicate3 = "kafkaMsgTimestamp < 9";
+    final int expectedRowCountInPlan = 50; //no pushdown
+    final int expectedRowCount = 30;
+
+    final String queryString = String.format(TestQueryConstants.QUERY_TEMPLATE_AND_OR_PATTERN_2,
+        TestQueryConstants.JSON_PUSHDOWN_TOPIC, predicate1, predicate2, predicate3);
+
+    runKafkaSQLVerifyCount(queryString,expectedRowCount);
+    testPhysicalPlan(queryString, String.format(expectedSubStr, expectedRowCountInPlan));
+  }
+
+}
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
index 1931898a53..32d9ff10a4 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaMessageGenerator.java
@@ -35,6 +35,7 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -131,4 +132,37 @@ public void populateJsonMsgIntoKafka(String topic, int numMsg) throws Interrupte
     }
   }
 
+  public void populateJsonMsgWithTimestamps(String topic, int numMsg) {
+    KafkaProducer<String, String> producer = null;
+    Random rand = new Random();
+    try {
+      producer = new KafkaProducer<String, String>(producerProperties);
+      int halfCount = numMsg / 2;
+
+      for(PartitionInfo tpInfo : producer.partitionsFor(topic)) {
+        for (int i = 1; i <= numMsg; ++i) {
+          JsonObject object = new JsonObject();
+          object.addProperty("stringKey", UUID.randomUUID().toString());
+          object.addProperty("intKey", numMsg - i);
+          object.addProperty("boolKey", i % 2 == 0);
+
+          long timestamp = i < halfCount ? (halfCount - i) : i;
+          ProducerRecord<String, String> message =
+              new ProducerRecord<String, String>(tpInfo.topic(), tpInfo.partition(), timestamp, "key"+i, object.toString());
+          logger.info("Publishing message : {}", message);
+          Future<RecordMetadata> future = producer.send(message);
+          logger.info("Committed offset of the message : {}", future.get().offset());
+        }
+
+      }
+    } catch (Throwable th) {
+      logger.error(th.getMessage(), th);
+      throw new DrillRuntimeException(th.getMessage(), th);
+    } finally {
+      if (producer != null) {
+        producer.close();
+      }
+    }
+  }
+
 }
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java
index 4a155963fa..4347167576 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java
@@ -25,7 +25,6 @@
 import org.apache.drill.categories.SlowTest;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
-import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -40,7 +39,7 @@
 public class MessageIteratorTest extends KafkaTestBase {
 
   private KafkaConsumer<byte[], byte[]> kafkaConsumer;
-  private KafkaSubScanSpec subScanSpec;
+  private KafkaPartitionScanSpec subScanSpec;
 
   @Before
   public void setUp() {
@@ -49,7 +48,7 @@ public void setUp() {
     consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
     consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "4");
     kafkaConsumer = new KafkaConsumer<>(consumerProps);
-    subScanSpec = new KafkaSubScanSpec(TestQueryConstants.JSON_TOPIC, 0, 0, TestKafkaSuit.NUM_JSON_MSG);
+    subScanSpec = new KafkaPartitionScanSpec(TestQueryConstants.JSON_TOPIC, 0, 0, TestKafkaSuit.NUM_JSON_MSG);
   }
 
   @After
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
index ed0174755a..ecf998e3ad 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
@@ -27,6 +27,7 @@
 import org.apache.drill.exec.ZookeeperTestUtil;
 import org.apache.drill.exec.store.kafka.cluster.EmbeddedKafkaCluster;
 import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactoryTest;
+import org.apache.kafka.common.config.TopicConfig;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.security.JaasUtils;
 
@@ -46,7 +47,8 @@
 
 @Category({KafkaStorageTest.class, SlowTest.class})
 @RunWith(Suite.class)
-@SuiteClasses({ KafkaQueriesTest.class, MessageIteratorTest.class, MessageReaderFactoryTest.class })
+@SuiteClasses({ KafkaQueriesTest.class, MessageIteratorTest.class, MessageReaderFactoryTest.class,
+    KafkaFilterPushdownTest.class })
 public class TestKafkaSuit {
   private static final Logger logger = LoggerFactory.getLogger(LoggerFactory.class);
   private static final String LOGIN_CONF_RESOURCE_PATHNAME = "login.conf";
@@ -106,4 +108,18 @@ public static void tearDownCluster() throws Exception {
     }
   }
 
+  public static void createTopicHelper(final String topicName, final int partitions) {
+
+    Properties topicProps = new Properties();
+    topicProps.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "CreateTime");
+    ZkUtils zkUtils = new ZkUtils(zkClient,
+        new ZkConnection(embeddedKafkaCluster.getZkServer().getConnectionString()), false);
+    AdminUtils.createTopic(zkUtils, topicName, partitions, 1,
+        topicProps, RackAwareMode.Disabled$.MODULE$);
+
+    org.apache.kafka.common.requests.MetadataResponse.TopicMetadata fetchTopicMetadataFromZk =
+        AdminUtils.fetchTopicMetadataFromZk(topicName, zkUtils);
+    logger.info("Topic Metadata: " + fetchTopicMetadataFromZk);
+  }
+
 }
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestQueryConstants.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestQueryConstants.java
index 057af7eb9b..b3163adbcd 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestQueryConstants.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestQueryConstants.java
@@ -29,12 +29,24 @@
   int MAX_CLIENT_CONNECTIONS = 100;
 
   String JSON_TOPIC = "drill-json-topic";
+  String JSON_PUSHDOWN_TOPIC = "drill-pushdown-topic";
   String AVRO_TOPIC = "drill-avro-topic";
   String INVALID_TOPIC = "invalid-topic";
 
+  String KAFKA_MSG_TIMESTAMP_FIELD = "kafkaMsgTimestamp";
+  String KAFKA_PARTITION_ID_FIELD = "kafkaPartitionId";
+  String KAFKA_MSG_OFFSET_FIELD = "kafkaMsgOffset";
+
   // Queries
   String MSG_COUNT_QUERY = "select count(*) from kafka.`%s`";
   String MSG_SELECT_QUERY = "select * from kafka.`%s`";
   String MIN_OFFSET_QUERY = "select MIN(kafkaMsgOffset) as minOffset from kafka.`%s`";
   String MAX_OFFSET_QUERY = "select MAX(kafkaMsgOffset) as maxOffset from kafka.`%s`";
+
+  String QUERY_TEMPLATE_BASIC = "select * from kafka.`%s` where %s";
+  String QUERY_TEMPLATE_AND = "select * from kafka.`%s` where %s AND %s";
+  String QUERY_TEMPLATE_OR = "select * from kafka.`%s` where %s OR %s";
+  String QUERY_TEMPLATE_AND_OR_PATTERN_1 = "select * from kafka.`%s` where %s AND (%s OR %s)";
+  String QUERY_TEMPLATE_AND_OR_PATTERN_2 = "select * from kafka.`%s` where %s OR (%s AND %s)";
+  String QUERY_TEMPLATE_AND_OR_PATTERN_3 = "select * from kafka.`%s` where (%s OR %s) AND (%s OR %s)";
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services