You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by twalthr <gi...@git.apache.org> on 2015/09/14 15:37:10 UTC

[GitHub] flink pull request: [FLINK-2167] [table] Add fromHCat() to TableEn...

GitHub user twalthr opened a pull request:

    https://github.com/apache/flink/pull/1127

    [FLINK-2167] [table] Add fromHCat() to TableEnvironment

    This PR introduces input format interfaces (so-called `TableSource`s) for the Table API. There are two types of TableSources:
    
    - `AdaptiveTableSource`s can adapt their output to the requirements of the plan. Although the output schema stays the same, the TableSource can react on field resolution and/or predicates internally and can return adapted DataSet/DataStream versions in the "translate" step.
    - `StaticTableSource`s are an easy way to provide the Table API with additional input formats without much implementation effort (e.g. for fromCsvFile())
    
    TableSource have been deeply integrated into the Table API. 
    
    The TableEnvironment now requires the newly introduced `AbstractExecutionEnvironment` (common super class of all ExecutionEnvironments for DataSets and DataStreams).
    
    An example of an AdaptiveTableSources can be found in `HCatTableSource`. HCatTableSource supports predicate pushdown as well as selection pushdown to HCatalog. Only those predicates are pushed to HCatalog that are partioned columns. Unresolved fields will not be read from HCatalog and remain `null` within the Table APIs rows.
    
    A an easy example looks like:
    ```
    TableEnironment t = new TableEnvironment(env);
    t.fromHCat("database", "table")
      .select("col1, col2")
      .filter("partCol==='5'");
    ```
    
    Here's what a TableSource can see from more complicated queries:
    
    ```
    getTableJava(tableSource1)
      .filter("a===5 || a===6")
      .select("a as a4, b as b4, c as c4")
      .filter("b4===7")
      .join(getTableJava(tableSource2))
      .where("a===a4 && c==='Test' && c4==='Test2'")
    
    // Result predicates for tableSource1:
    // 	List("a===5 || a===6", "b===7", "c==='Test2'")
    // Result predicates for tableSource2:
    //	List("c==='Test'")
    // Result resolved fields for tableSource1 (true = filtering, false=selection):
    // 	Set(("a", true), ("a", false), ("b", true), ("b", false), ("c", false), ("c", true))
    // Result resolved fields for tableSource2 (true = filtering, false=selection):
    // 	Set(("a", true), ("c", true))
    ```
    
    
    HCatTableSource has no tests yet, but I will implement it them soon. First I would be happy about some general feedback.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/twalthr/flink TableApiHcat

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1127.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1127
    
----
commit f245604caccd8f97c1d6eabf16968dab3aa47572
Author: twalthr <tw...@apache.org>
Date:   2015-07-09T09:57:05Z

    [FLINK-2167] [table] Add fromHCat() to TableEnvironment

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2167] [table] Add fromHCat() to TableEn...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr closed the pull request at:

    https://github.com/apache/flink/pull/1127


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2167] [table] Add fromHCat() to TableEn...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1127#discussion_r40536239
  
    --- Diff: flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/PredicatePushdown.scala ---
    @@ -0,0 +1,124 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.expressions.analysis
    +
    +import org.apache.flink.api.table.expressions._
    +import org.apache.flink.api.table.expressions.analysis.FieldBacktracker
    +.resolveFieldNameAndTableSource
    +import org.apache.flink.api.table.expressions.analysis.PredicateFilter.pruneExpr
    +import org.apache.flink.api.table.input.{AdaptiveTableSource, TableSource}
    +import org.apache.flink.api.table.plan._
    +import org.apache.flink.api.table.trees.Rule
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +/**
    + * Pushes constant predicates (e.g. a===12 && b.isNotNull) to each corresponding
    + * AdaptiveTableSource that support predicates.
    + */
    +class PredicatePushdown(val inputOperation: PlanNode) extends Rule[Expression] {
    +
    +  def apply(expr: Expression) = {
    +    // get all table sources where predicates can be push into
    +    val tableSources = getPushableTableSources(inputOperation)
    +
    +    // prune expression tree such that it only contains constant predicates
    +    // such as a=1,a="Hello World", isNull(a) but not a=b
    +    val constantExpr = pruneExpr(isResolvedAndConstant, expr)
    +
    +    // push predicates to each table source respectively
    +    for (ts <- tableSources) {
    +      // prune expression tree such that it only contains field references of ts
    +      val tsExpr = pruneExpr((e) => isSameTableSource(e, ts), constantExpr)
    +
    +      // resolve field names to field names of the table source
    +      val result = tsExpr.transformPost {
    +        case rfr@ResolvedFieldReference(fieldName, typeInfo) =>
    +          ResolvedFieldReference(
    +            resolveFieldNameAndTableSource(inputOperation, fieldName)._2,
    +            typeInfo
    +          )
    +      }
    +      // push down predicates
    +      if (result != NopExpression()) {
    +        ts.notifyPredicates(result)
    +      }
    +    }
    +    expr
    +  }
    +
    +  // ----------------------------------------------------------------------------------------------
    +
    +  /**
    +   * @return all AdaptiveTableSources the given PlanNode contains
    +   */
    +  def getPushableTableSources(tree: PlanNode): Seq[AdaptiveTableSource] = tree match {
    +    case Root(ts: AdaptiveTableSource, _) if ts.supportsPredicatePushdown() => Seq(ts)
    +    case pn:PlanNode =>
    +      val res = new ArrayBuffer[AdaptiveTableSource]()
    +      for (child <- pn.children) res ++= getPushableTableSources(child)
    +      res
    --- End diff --
    
    This can be replaced by:
    ```scala
    pn.children flatMap { child => getPushableTableSources(child ) }
    ```
    if I'm not mistaken, seems more Scala-y :smile: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2167] [table] Add fromHCat() to TableEn...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1127#discussion_r40674516
  
    --- Diff: flink-staging/flink-table/pom.xml ---
    @@ -37,7 +37,7 @@ under the License.
     		<dependency>
     			<groupId>com.google.guava</groupId>
     			<artifactId>guava</artifactId>
    -			<version>${guava.version}</version>
    +			<version>14.0.1</version>
    --- End diff --
    
    Thanks so far, I will look into this issue again and will report.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2167] [table] Add fromHCat() to TableEn...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the pull request:

    https://github.com/apache/flink/pull/1127#issuecomment-140355813
  
    Hi @chiwanpark, yes I think splitting it up makes sense. I just opened this PR to get some feedback and to show why my changes are necessary to integrate new input formats like HCatalog. You can ignore the `HCatTableSource` class as it is untested yet anyway.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2167] [table] Add fromHCat() to TableEn...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1127#discussion_r40540522
  
    --- Diff: flink-staging/flink-table/pom.xml ---
    @@ -59,6 +59,12 @@ under the License.
     		</dependency>
     
     		<dependency>
    +			<groupId>org.apache.flink</groupId>
    +			<artifactId>flink-hcatalog</artifactId>
    +			<version>${project.version}</version>
    +		</dependency>
    +
    +		<dependency>
    --- End diff --
    
    What do you mean with "put into another package"? Do you mean putting Table API classes in the `flink-hcatalog` package? I think thats a general decision we have to make. Or do we want additional Maven Table API I/O format modules? For Parquet, HCat. etc...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2167] [table] Add fromHCat() to TableEn...

Posted by chiwanpark <gi...@git.apache.org>.
Github user chiwanpark commented on the pull request:

    https://github.com/apache/flink/pull/1127#issuecomment-140327645
  
    Hi @twalthr, thanks for your contribution. But this PR contains many changes unrelated to HCatalog format. Maybe we should split this PR into HCatalog and other changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2167] [table] Add fromHCat() to TableEn...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1127#discussion_r40545392
  
    --- Diff: flink-staging/flink-table/pom.xml ---
    @@ -37,7 +37,7 @@ under the License.
     		<dependency>
     			<groupId>com.google.guava</groupId>
     			<artifactId>guava</artifactId>
    -			<version>${guava.version}</version>
    +			<version>14.0.1</version>
    --- End diff --
    
    But then this could lead to problems with shading. Let's wait for @rmetzger on this, he probably knows this stuff best.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2167] [table] Add fromHCat() to TableEn...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1127#discussion_r40545360
  
    --- Diff: flink-staging/flink-table/pom.xml ---
    @@ -59,6 +59,12 @@ under the License.
     		</dependency>
     
     		<dependency>
    +			<groupId>org.apache.flink</groupId>
    +			<artifactId>flink-hcatalog</artifactId>
    +			<version>${project.version}</version>
    +		</dependency>
    +
    +		<dependency>
    --- End diff --
    
    I meant putting the HCat specific stuff into a different package. But I now realize that we could not have the `fromHCat` call on `TableEnvironment` then. So this is ok, I guess.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2167] [table] Add fromHCat() to TableEn...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1127#discussion_r40534229
  
    --- Diff: flink-staging/flink-table/pom.xml ---
    @@ -59,6 +59,12 @@ under the License.
     		</dependency>
     
     		<dependency>
    +			<groupId>org.apache.flink</groupId>
    +			<artifactId>flink-hcatalog</artifactId>
    +			<version>${project.version}</version>
    +		</dependency>
    +
    +		<dependency>
    --- End diff --
    
    Is it necessary to have the hcatalog dependency here or could it be put into another package?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2167] [table] Add fromHCat() to TableEn...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1127#discussion_r40534596
  
    --- Diff: flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala ---
    @@ -54,6 +56,26 @@ class JavaBatchTranslator extends PlanTranslator {
         Table(Root(rowDataSet, resultFields))
       }
     
    +  override def createTable(tableSource: TableSource): Table = {
    +    // a TableSource requires an ExecutionEnvironment
    +    if (env == null) {
    +      throw new InvalidProgramException("This operation requires an ExecutionEnvironment.")
    +    }
    +    if (tableSource.isInstanceOf[AdaptiveTableSource]) {
    +      val adaptiveTs = tableSource.asInstanceOf[AdaptiveTableSource]
    +      if (adaptiveTs.supportsResolvedFieldPushdown || adaptiveTs.supportsPredicatePushdown) {
    +        Table(Root(adaptiveTs, adaptiveTs.getOutputFields()))
    +      }
    +      else {
    +        Table(Root(adaptiveTs.createAdaptiveDataSet(env), adaptiveTs.getOutputFields()))
    +      }
    +    }
    +    else {
    +      val staticTs = tableSource.asInstanceOf[StaticTableSource]
    +      createTable(staticTs.createStaticDataSet(env), staticTs.getOutputFieldNames().mkString(","))
    +    }
    --- End diff --
    
    You can replace the if/else by:
    
    ```scala
    tableSource match {
      case adaptive: AdaptiveTableSource if adaptive.supportsResolvedFieldPushdown && ... => ...
    
      case adaptive: AdaptiveTableSource => ...
      
      case static: StaticTableSource => ...
    
      case _ => throw new Exception("something unexpected")
    }
    ```
    
    if I'm not mistaken, this is just of the top of my head.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2167] [table] Add fromHCat() to TableEn...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1127#discussion_r40534836
  
    --- Diff: flink-staging/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala ---
    @@ -118,6 +140,12 @@ class JavaBatchTranslator extends PlanTranslator {
           case Root(dataSet: JavaDataSet[Row], resultFields) =>
             dataSet
     
    +      case Root(tableSource: AdaptiveTableSource, resultFields) =>
    +        if (env == null) {
    +          throw new InvalidProgramException("This operation requires an TableEnvironment.");
    --- End diff --
    
    semicolon


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2167] [table] Add fromHCat() to TableEn...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1127#discussion_r40534193
  
    --- Diff: flink-staging/flink-table/pom.xml ---
    @@ -37,7 +37,7 @@ under the License.
     		<dependency>
     			<groupId>com.google.guava</groupId>
     			<artifactId>guava</artifactId>
    -			<version>${guava.version}</version>
    +			<version>14.0.1</version>
    --- End diff --
    
    Why do you manually set the version here? Is it necessary because of something with HCat? @rmetzger what could we do in such a case?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2167] [table] Add fromHCat() to TableEn...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1127#discussion_r40535960
  
    --- Diff: flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/ResolveFieldReferences.scala ---
    @@ -30,21 +33,43 @@ import org.apache.flink.api.table.trees.Rule
      * Rule that resolved field references. This rule verifies that field references point to existing
      * fields of the input operation and creates [[ResolvedFieldReference]]s that hold the field
      * [[TypeInformation]] in addition to the field name.
    + * 
    + * @param inputOperation is optional but required if resolved fields should be pushed to underlying
    + *                       table sources.
    + * @param filtering defines if the field is resolved as a part of filtering operation or not
      */
    -class ResolveFieldReferences(inputFields: Seq[(String, TypeInformation[_])])
    +class ResolveFieldReferences(inputFields: Seq[(String, TypeInformation[_])],
    +                             inputOperation: PlanNode,
    +                             filtering: Boolean)
    --- End diff --
    
    It is Scala Style to indent the parameters by 4 spaces (even the first parameter) if they don't fit on a line, like so:
    ```scala
    class ResolveFieldReferences(
        inputFields: Seq[(String, TypeInformation[_])],
        inputOperation: PlanNode,
        filtering: Boolean)  extends Rule[Expression] {
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2167] [table] Add fromHCat() to TableEn...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1127#discussion_r40536932
  
    --- Diff: flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/expressions/analysis/PredicateFilter.scala ---
    @@ -0,0 +1,88 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.table.expressions.analysis
    +
    +import org.apache.flink.api.table.expressions._
    +
    +object PredicateFilter {
    --- End diff --
    
    Maybe this should be called PredicatePruner


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2167] [table] Add fromHCat() to TableEn...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the pull request:

    https://github.com/apache/flink/pull/1127#issuecomment-143718074
  
    Thanks @aljoscha for reviewing my code! Sorry for the bad Scala style. This was my first really large Scala code project I wrote (not just small scripts), I'm still learning by doing ;) I will correct the issues you mentioned.
    
    For the technical part: Yes, I don't modify the expression tree, I'm just giving the sources the possibility to adapt to the need of the program. Regarding the `supports*` methods, yes you are right, actually they are not necessary, but I thought it makes sense for possible future table sources to check that in advance. In some cases it also reduces the amout of some method calls, but I can also remove the `supports*` methods for reason of simplicity, no problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2167] [table] Add fromHCat() to TableEn...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the pull request:

    https://github.com/apache/flink/pull/1127#issuecomment-145001030
  
    Again, thanks for the feedback.
    I will close this PR and open 2 separate PRs for TableSources and HCatInputFormat.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2167] [table] Add fromHCat() to TableEn...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1127#issuecomment-143728302
  
    Don't worry, everyone was once a starter in Scala. :smiley: 
    
    You  can also leave the `supports*` methods in if you think that they might be necessary in the future. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2167] [table] Add fromHCat() to TableEn...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/flink/pull/1127#issuecomment-143703367
  
    Hi,
    I like the work. :smile: 
    
    Some remarks about Scala style. We are trying to make our Scala code more consistent. Mostly, I didn't comment everything, just one particular pattern of code that could be changed.
    
    Now, for the technical part, you are not actually removing the predicates that are pushed down from an expression, correct? This should be alright, since the result will still be correct. It might just be some future optimization, but I also think that the cost of evaluating the predicate is negligible. The real improvement comes from early filtering in the sources, as you implemented. 
    
    Then, why do you have the `supports*` methods in `AdaptiveTableSource`. Couldn't these methods just do nothing in case the source does not support the feature. Or maybe return false if the pushdown was not successful. (I also wonder why you have the differentiation between AdaptiveSources that support pushdown and those that don't in `JavaBatchTranslator.createTable`. (I think you do it so that stuff does not get pushed to sources that don't support it, but this distinction might not be necessary, as mentioned above.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2167] [table] Add fromHCat() to TableEn...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1127#discussion_r40540138
  
    --- Diff: flink-staging/flink-table/pom.xml ---
    @@ -37,7 +37,7 @@ under the License.
     		<dependency>
     			<groupId>com.google.guava</groupId>
     			<artifactId>guava</artifactId>
    -			<version>${guava.version}</version>
    +			<version>14.0.1</version>
    --- End diff --
    
    Yes, I had method signature problems with the HCat input format. We are using a very old version of HCat and therefore need a very old version of guava.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-2167] [table] Add fromHCat() to TableEn...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1127#discussion_r40674158
  
    --- Diff: flink-staging/flink-table/pom.xml ---
    @@ -37,7 +37,7 @@ under the License.
     		<dependency>
     			<groupId>com.google.guava</groupId>
     			<artifactId>guava</artifactId>
    -			<version>${guava.version}</version>
    +			<version>14.0.1</version>
    --- End diff --
    
    What exactly was the problem with the hcat input format and guava?
    If their guava version is really incompatible with ours, we can do the following:
    - Use a newer hcat version
    - create a special shaded-hcat maven module which shades away hcat's guava.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---