You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/10/22 08:10:31 UTC
[1/2] flink git commit: [FLINK-4581] [table] Fix Table API throwing
"No suitable driver found for jdbc:calcite"
Repository: flink
Updated Branches:
refs/heads/master 770f2f83a -> 227cdc829
[FLINK-4581] [table] Fix Table API throwing "No suitable driver found for jdbc:calcite"
This closes #2506
This closes #1491 // closing stale PR
This closes #997 // closing stale PR
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f0e451ae
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f0e451ae
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f0e451ae
Branch: refs/heads/master
Commit: f0e451ae3cbfe40926d08ffaf04cdff58136b306
Parents: 770f2f8
Author: twalthr <tw...@apache.org>
Authored: Fri Sep 16 11:41:15 2016 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Sat Oct 22 10:09:32 2016 +0200
----------------------------------------------------------------------
.../flink/api/table/FlinkRelBuilder.scala | 38 ++++++--------------
1 file changed, 11 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f0e451ae/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
index 34ed4ce..1215806 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala
@@ -18,24 +18,25 @@
package org.apache.flink.api.table
+import java.util.Collections
+
import org.apache.calcite.jdbc.CalciteSchema
-import org.apache.calcite.plan.{Context, RelOptCluster, RelOptPlanner, RelOptSchema}
+import org.apache.calcite.plan._
+import org.apache.calcite.plan.volcano.VolcanoPlanner
import org.apache.calcite.prepare.CalciteCatalogReader
-import org.apache.calcite.rex.{RexExecutorImpl, RexBuilder}
-import org.apache.calcite.schema.{Schemas, SchemaPlus}
-import org.apache.calcite.tools.Frameworks.PlannerAction
-import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RelBuilder}
+import org.apache.calcite.rex.RexBuilder
+import org.apache.calcite.tools.{FrameworkConfig, RelBuilder}
/**
* Flink specific [[RelBuilder]] that changes the default type factory to a [[FlinkTypeFactory]].
*/
class FlinkRelBuilder(
context: Context,
- cluster: RelOptCluster,
+ relOptCluster: RelOptCluster,
relOptSchema: RelOptSchema)
extends RelBuilder(
context,
- cluster,
+ relOptCluster,
relOptSchema) {
def getPlanner: RelOptPlanner = cluster.getPlanner
@@ -49,37 +50,20 @@ class FlinkRelBuilder(
object FlinkRelBuilder {
def create(config: FrameworkConfig): FlinkRelBuilder = {
- // prepare planner and collect context instances
- val clusters: Array[RelOptCluster] = Array(null)
- val relOptSchemas: Array[RelOptSchema] = Array(null)
- val rootSchemas: Array[SchemaPlus] = Array(null)
- Frameworks.withPlanner(new PlannerAction[Void] {
- override def apply(
- cluster: RelOptCluster,
- relOptSchema: RelOptSchema,
- rootSchema: SchemaPlus)
- : Void = {
- clusters(0) = cluster
- relOptSchemas(0) = relOptSchema
- rootSchemas(0) = rootSchema
- null
- }
- })
- val planner = clusters(0).getPlanner
- planner.setExecutor(config.getExecutor)
- val defaultRelOptSchema = relOptSchemas(0).asInstanceOf[CalciteCatalogReader]
// create Flink type factory
val typeSystem = config.getTypeSystem
val typeFactory = new FlinkTypeFactory(typeSystem)
// create context instances with Flink type factory
+ val planner = new VolcanoPlanner(Contexts.empty())
+ planner.addRelTraitDef(ConventionTraitDef.INSTANCE)
val cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory))
val calciteSchema = CalciteSchema.from(config.getDefaultSchema)
val relOptSchema = new CalciteCatalogReader(
calciteSchema,
config.getParserConfig.caseSensitive(),
- defaultRelOptSchema.getSchemaName,
+ Collections.emptyList(),
typeFactory)
new FlinkRelBuilder(config.getContext, cluster, relOptSchema)
[2/2] flink git commit: [FLINK-4838] [docs] Remove STREAM keyword in
StreamSQLExample
Posted by fh...@apache.org.
[FLINK-4838] [docs] Remove STREAM keyword in StreamSQLExample
This closes #2645
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/227cdc82
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/227cdc82
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/227cdc82
Branch: refs/heads/master
Commit: 227cdc82917e72ad41180b054f1ca715af394baf
Parents: f0e451a
Author: manuzhang <ow...@gmail.com>
Authored: Mon Oct 17 12:48:35 2016 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Sat Oct 22 10:09:56 2016 +0200
----------------------------------------------------------------------
.../scala/org/apache/flink/examples/scala/StreamSQLExample.scala | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/227cdc82/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala
index 5b39080..63a5413 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamSQLExample.scala
@@ -59,8 +59,8 @@ object StreamSQLExample {
// union the two tables
val result = tEnv.sql(
- "SELECT STREAM * FROM OrderA WHERE amount > 2 UNION ALL " +
- "SELECT STREAM * FROM OrderB WHERE amount < 2")
+ "SELECT * FROM OrderA WHERE amount > 2 UNION ALL " +
+ "SELECT * FROM OrderB WHERE amount < 2")
result.toDataStream[Order].print()