You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Flavio Pompermaier <po...@okkam.it> on 2018/10/22 15:06:03 UTC

Java Table API and external catalog bug?

Hi to all,
I've tried to register an external catalog and use it with the Table API in
Flink 1.6.1.
The following (Java) test job cannot write to a sink using insertInto
because Flink cannot find the table by id (test.t2). Am I doing something
wrong or is this a bug?

This is my Java test class:

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.catalog.ExternalCatalogTable;
import org.apache.flink.table.catalog.InMemoryExternalCatalog;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.FormatDescriptor;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.sinks.CsvTableSink;

public class CatalogExperiment {
  public static void main(String[] args) throws Exception {
    // create an external catalog
    final String outPath = "file:/tmp/file2.txt";
    InMemoryExternalCatalog catalog = new InMemoryExternalCatalog("test");
    FileSystem connDescIn = new
FileSystem().path("file:/tmp/file-test.txt");
    FileSystem connDescOut = new FileSystem().path(outPath);
    FormatDescriptor csvDesc = new Csv()//
        .field("a", "string")//
        .field("b", "string")//
        .field("c", "string")//
        .fieldDelimiter("\t");
    Schema schemaDesc = new Schema()//
        .field("a", "string")//
        .field("b", "string")//
        .field("c", "string");
    ExternalCatalogTable t1 = ExternalCatalogTable.builder(connDescIn)//
        .withFormat(csvDesc)//
        .withSchema(schemaDesc)//
        .asTableSource();
    ExternalCatalogTable t2 = ExternalCatalogTable.builder(connDescOut)//
        .withFormat(csvDesc)//
        .withSchema(schemaDesc)//
        .asTableSink();
    catalog.createTable("t1", t1, true);
    catalog.createTable("t2", t2, true);

    final  ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
    final BatchTableEnvironment btEnv =
TableEnvironment.getTableEnvironment(env);
    btEnv.registerExternalCatalog("test", catalog);
    // this does not work ---------------------------------------
    btEnv.scan("test", "t1").insertInto("test.t2"); //ERROR: No table was
registered under the name test.t2
    // this works ---------------------------------------
    btEnv.scan("test", "t1").writeToSink(new CsvTableSink(outPath, "\t", 1,
WriteMode.OVERWRITE));
    env.execute();
  }
}


Best,
Flavio

Re: Java Table API and external catalog bug?

Posted by Fabian Hueske <fh...@gmail.com>.
IIRC, that was recently fixed.
Might come out with 1.6.2 / 1.7.0.

Cheers, Fabian


Flavio Pompermaier <po...@okkam.it> schrieb am Do., 25. Okt. 2018,
14:09:

> Ok thanks! I wasn't aware of this..that would be undoubtedly useful ;)
> On Thu, Oct 25, 2018 at 2:00 PM Timo Walther <tw...@apache.org> wrote:
>
>> Hi Flavio,
>>
>> the external catalog support is not feature complete yet. I think you can
>> only specify the catalog when reading from a table but `insertInto` does
>> not consider the catalog name.
>>
>> Regards,
>> TImo
>>
>>
>> Am 25.10.18 um 10:04 schrieb Flavio Pompermaier:
>>
>> Any other help here? is this a bug or something wrong in my code?
>>
>> On Tue, Oct 23, 2018 at 9:02 AM Flavio Pompermaier <po...@okkam.it>
>> wrote:
>>
>>> I've tried with t2, test.t2 and test.test.t2.
>>>
>>> On Mon, 22 Oct 2018, 19:26 Zhang, Xuefu, <xu...@alibaba-inc.com>
>>> wrote:
>>>
>>>> Have you tried "t2" instead of "test.t2"? There is a possibility that
>>>> catalog name isn't part of the table name in the table API.
>>>>
>>>> Thanks,
>>>> Xuefu
>>>>
>>>> ------------------------------------------------------------------
>>>> Sender:Flavio Pompermaier <po...@okkam.it>
>>>> Sent at:2018 Oct 22 (Mon) 23:06
>>>> Recipient:user <us...@flink.apache.org>
>>>> Subject:Java Table API and external catalog bug?
>>>>
>>>> Hi to all,
>>>> I've tried to register an external catalog and use it with the Table
>>>> API in Flink 1.6.1.
>>>> The following (Java) test job cannot write to a sink using insertInto
>>>> because Flink cannot find the table by id (test.t2). Am I doing something
>>>> wrong or is this a bug?
>>>>
>>>> This is my Java test class:
>>>>
>>>> import org.apache.flink.api.java.ExecutionEnvironment;
>>>> import org.apache.flink.core.fs.FileSystem.WriteMode;
>>>> import org.apache.flink.table.api.TableEnvironment;
>>>> import org.apache.flink.table.api.java.BatchTableEnvironment;
>>>> import org.apache.flink.table.catalog.ExternalCatalogTable;
>>>> import org.apache.flink.table.catalog.InMemoryExternalCatalog;
>>>> import org.apache.flink.table.descriptors.Csv;
>>>> import org.apache.flink.table.descriptors.FileSystem;
>>>> import org.apache.flink.table.descriptors.FormatDescriptor;
>>>> import org.apache.flink.table.descriptors.Schema;
>>>> import org.apache.flink.table.sinks.CsvTableSink;
>>>>
>>>> public class CatalogExperiment {
>>>>   public static void main(String[] args) throws Exception {
>>>>     // create an external catalog
>>>>     final String outPath = "file:/tmp/file2.txt";
>>>>     InMemoryExternalCatalog catalog = new
>>>> InMemoryExternalCatalog("test");
>>>>     FileSystem connDescIn = new FileSystem().path(
>>>> "file:/tmp/file-test.txt");
>>>>     FileSystem connDescOut = new FileSystem().path(outPath);
>>>>     FormatDescriptor csvDesc = new Csv()//
>>>>         .field("a", "string")//
>>>>         .field("b", "string")//
>>>>         .field("c", "string")//
>>>>         .fieldDelimiter("\t");
>>>>     Schema schemaDesc = new Schema()//
>>>>         .field("a", "string")//
>>>>         .field("b", "string")//
>>>>         .field("c", "string");
>>>>     ExternalCatalogTable t1 = ExternalCatalogTable.builder(connDescIn)//
>>>>         .withFormat(csvDesc)//
>>>>         .withSchema(schemaDesc)//
>>>>         .asTableSource();
>>>>     ExternalCatalogTable t2 =
>>>> ExternalCatalogTable.builder(connDescOut)//
>>>>         .withFormat(csvDesc)//
>>>>         .withSchema(schemaDesc)//
>>>>         .asTableSink();
>>>>     catalog.createTable("t1", t1, true);
>>>>     catalog.createTable("t2", t2, true);
>>>>
>>>>     final  ExecutionEnvironment env =
>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>     final BatchTableEnvironment btEnv =
>>>> TableEnvironment.getTableEnvironment(env);
>>>>     btEnv.registerExternalCatalog("test", catalog);
>>>>     // this does not work ---------------------------------------
>>>>     btEnv.scan("test", "t1").insertInto("test.t2"); //ERROR: No table
>>>> was registered under the name test.t2
>>>>     // this works ---------------------------------------
>>>>     btEnv.scan("test", "t1").writeToSink(new CsvTableSink(outPath,
>>>> "\t", 1, WriteMode.OVERWRITE));
>>>>     env.execute();
>>>>   }
>>>> }
>>>>
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>>
>>
>>
>

Re: Java Table API and external catalog bug?

Posted by Flavio Pompermaier <po...@okkam.it>.
Ok thanks! I wasn't aware of this..that would be undoubtedly useful ;)
On Thu, Oct 25, 2018 at 2:00 PM Timo Walther <tw...@apache.org> wrote:

> Hi Flavio,
>
> the external catalog support is not feature complete yet. I think you can
> only specify the catalog when reading from a table but `insertInto` does
> not consider the catalog name.
>
> Regards,
> TImo
>
>
> Am 25.10.18 um 10:04 schrieb Flavio Pompermaier:
>
> Any other help here? is this a bug or something wrong in my code?
>
> On Tue, Oct 23, 2018 at 9:02 AM Flavio Pompermaier <po...@okkam.it>
> wrote:
>
>> I've tried with t2, test.t2 and test.test.t2.
>>
>> On Mon, 22 Oct 2018, 19:26 Zhang, Xuefu, <xu...@alibaba-inc.com> wrote:
>>
>>> Have you tried "t2" instead of "test.t2"? There is a possibility that
>>> catalog name isn't part of the table name in the table API.
>>>
>>> Thanks,
>>> Xuefu
>>>
>>> ------------------------------------------------------------------
>>> Sender:Flavio Pompermaier <po...@okkam.it>
>>> Sent at:2018 Oct 22 (Mon) 23:06
>>> Recipient:user <us...@flink.apache.org>
>>> Subject:Java Table API and external catalog bug?
>>>
>>> Hi to all,
>>> I've tried to register an external catalog and use it with the Table API
>>> in Flink 1.6.1.
>>> The following (Java) test job cannot write to a sink using insertInto
>>> because Flink cannot find the table by id (test.t2). Am I doing something
>>> wrong or is this a bug?
>>>
>>> This is my Java test class:
>>>
>>> import org.apache.flink.api.java.ExecutionEnvironment;
>>> import org.apache.flink.core.fs.FileSystem.WriteMode;
>>> import org.apache.flink.table.api.TableEnvironment;
>>> import org.apache.flink.table.api.java.BatchTableEnvironment;
>>> import org.apache.flink.table.catalog.ExternalCatalogTable;
>>> import org.apache.flink.table.catalog.InMemoryExternalCatalog;
>>> import org.apache.flink.table.descriptors.Csv;
>>> import org.apache.flink.table.descriptors.FileSystem;
>>> import org.apache.flink.table.descriptors.FormatDescriptor;
>>> import org.apache.flink.table.descriptors.Schema;
>>> import org.apache.flink.table.sinks.CsvTableSink;
>>>
>>> public class CatalogExperiment {
>>>   public static void main(String[] args) throws Exception {
>>>     // create an external catalog
>>>     final String outPath = "file:/tmp/file2.txt";
>>>     InMemoryExternalCatalog catalog = new
>>> InMemoryExternalCatalog("test");
>>>     FileSystem connDescIn = new FileSystem().path(
>>> "file:/tmp/file-test.txt");
>>>     FileSystem connDescOut = new FileSystem().path(outPath);
>>>     FormatDescriptor csvDesc = new Csv()//
>>>         .field("a", "string")//
>>>         .field("b", "string")//
>>>         .field("c", "string")//
>>>         .fieldDelimiter("\t");
>>>     Schema schemaDesc = new Schema()//
>>>         .field("a", "string")//
>>>         .field("b", "string")//
>>>         .field("c", "string");
>>>     ExternalCatalogTable t1 = ExternalCatalogTable.builder(connDescIn)//
>>>         .withFormat(csvDesc)//
>>>         .withSchema(schemaDesc)//
>>>         .asTableSource();
>>>     ExternalCatalogTable t2 = ExternalCatalogTable.builder(connDescOut)//
>>>         .withFormat(csvDesc)//
>>>         .withSchema(schemaDesc)//
>>>         .asTableSink();
>>>     catalog.createTable("t1", t1, true);
>>>     catalog.createTable("t2", t2, true);
>>>
>>>     final  ExecutionEnvironment env =
>>> ExecutionEnvironment.getExecutionEnvironment();
>>>     final BatchTableEnvironment btEnv =
>>> TableEnvironment.getTableEnvironment(env);
>>>     btEnv.registerExternalCatalog("test", catalog);
>>>     // this does not work ---------------------------------------
>>>     btEnv.scan("test", "t1").insertInto("test.t2"); //ERROR: No table
>>> was registered under the name test.t2
>>>     // this works ---------------------------------------
>>>     btEnv.scan("test", "t1").writeToSink(new CsvTableSink(outPath, "\t",
>>> 1, WriteMode.OVERWRITE));
>>>     env.execute();
>>>   }
>>> }
>>>
>>>
>>> Best,
>>> Flavio
>>>
>>>
>
>

Re: Java Table API and external catalog bug?

Posted by Timo Walther <tw...@apache.org>.
Hi Flavio,

the external catalog support is not feature complete yet. I think you 
can only specify the catalog when reading from a table but `insertInto` 
does not consider the catalog name.

Regards,
TImo


Am 25.10.18 um 10:04 schrieb Flavio Pompermaier:
> Any other help here? is this a bug or something wrong in my code?
>
> On Tue, Oct 23, 2018 at 9:02 AM Flavio Pompermaier 
> <pompermaier@okkam.it <ma...@okkam.it>> wrote:
>
>     I've tried with t2, test.t2 and test.test.t2.
>
>     On Mon, 22 Oct 2018, 19:26 Zhang, Xuefu, <xuefu.z@alibaba-inc.com
>     <ma...@alibaba-inc.com>> wrote:
>
>         Have you tried "t2" instead of "test.t2"? There is a
>         possibility that catalog name isn't part of the table name in
>         the table API.
>
>         Thanks,
>         Xuefu
>
>             ------------------------------------------------------------------
>             Sender:Flavio Pompermaier <pompermaier@okkam.it
>             <ma...@okkam.it>>
>             Sent at:2018 Oct 22 (Mon) 23:06
>             Recipient:user <user@flink.apache.org
>             <ma...@flink.apache.org>>
>             Subject:Java Table API and external catalog bug?
>
>             Hi to all,
>             I've tried to register an external catalog and use it with
>             the Table API in Flink 1.6.1.
>             The following (Java) test job cannot write to a sink using
>             insertInto because Flink cannot find the table by id
>             (test.t2). Am I doing something wrong or is this a bug?
>
>             This is my Java test class:
>
>             import org.apache.flink.api.java.ExecutionEnvironment;
>             import org.apache.flink.core.fs.FileSystem.WriteMode;
>             import org.apache.flink.table.api.TableEnvironment;
>             import org.apache.flink.table.api.java.BatchTableEnvironment;
>             import org.apache.flink.table.catalog.ExternalCatalogTable;
>             import org.apache.flink.table.catalog.InMemoryExternalCatalog;
>             import org.apache.flink.table.descriptors.Csv;
>             import org.apache.flink.table.descriptors.FileSystem;
>             import org.apache.flink.table.descriptors.FormatDescriptor;
>             import org.apache.flink.table.descriptors.Schema;
>             import org.apache.flink.table.sinks.CsvTableSink;
>
>             public class CatalogExperiment {
>               public static void main(String[] args) throws Exception {
>                 // create an external catalog
>                 final String outPath = "file:/tmp/file2.txt";
>                 InMemoryExternalCatalog catalog = new
>             InMemoryExternalCatalog("test");
>                 FileSystem connDescIn = new
>             FileSystem().path("file:/tmp/file-test.txt");
>                 FileSystem connDescOut = new FileSystem().path(outPath);
>                 FormatDescriptor csvDesc = new Csv()//
>                     .field("a", "string")//
>                     .field("b", "string")//
>                     .field("c", "string")//
>                     .fieldDelimiter("\t");
>                 Schema schemaDesc = new Schema()//
>                     .field("a", "string")//
>                     .field("b", "string")//
>                     .field("c", "string");
>                 ExternalCatalogTable t1 =
>             ExternalCatalogTable.builder(connDescIn)//
>                     .withFormat(csvDesc)//
>             .withSchema(schemaDesc)//
>                     .asTableSource();
>                 ExternalCatalogTable t2 =
>             ExternalCatalogTable.builder(connDescOut)//
>                     .withFormat(csvDesc)//
>             .withSchema(schemaDesc)//
>                     .asTableSink();
>                 catalog.createTable("t1", t1, true);
>                 catalog.createTable("t2", t2, true);
>
>                 final  ExecutionEnvironment env =
>             ExecutionEnvironment.getExecutionEnvironment();
>                 final BatchTableEnvironment btEnv =
>             TableEnvironment.getTableEnvironment(env);
>             btEnv.registerExternalCatalog("test", catalog);
>                 // this does not work
>             ---------------------------------------
>                 btEnv.scan("test", "t1").insertInto("test.t2");
>             //ERROR: No table was registered under the name test.t2
>                 // this works ---------------------------------------
>                 btEnv.scan("test", "t1").writeToSink(new
>             CsvTableSink(outPath, "\t", 1, WriteMode.OVERWRITE));
>                 env.execute();
>               }
>             }
>
>
>             Best,
>             Flavio
>
>


Re: Java Table API and external catalog bug?

Posted by Flavio Pompermaier <po...@okkam.it>.
Any other help here? is this a bug or something wrong in my code?

On Tue, Oct 23, 2018 at 9:02 AM Flavio Pompermaier <po...@okkam.it>
wrote:

> I've tried with t2, test.t2 and test.test.t2.
>
> On Mon, 22 Oct 2018, 19:26 Zhang, Xuefu, <xu...@alibaba-inc.com> wrote:
>
>> Have you tried "t2" instead of "test.t2"? There is a possibility that
>> catalog name isn't part of the table name in the table API.
>>
>> Thanks,
>> Xuefu
>>
>> ------------------------------------------------------------------
>> Sender:Flavio Pompermaier <po...@okkam.it>
>> Sent at:2018 Oct 22 (Mon) 23:06
>> Recipient:user <us...@flink.apache.org>
>> Subject:Java Table API and external catalog bug?
>>
>> Hi to all,
>> I've tried to register an external catalog and use it with the Table API
>> in Flink 1.6.1.
>> The following (Java) test job cannot write to a sink using insertInto
>> because Flink cannot find the table by id (test.t2). Am I doing something
>> wrong or is this a bug?
>>
>> This is my Java test class:
>>
>> import org.apache.flink.api.java.ExecutionEnvironment;
>> import org.apache.flink.core.fs.FileSystem.WriteMode;
>> import org.apache.flink.table.api.TableEnvironment;
>> import org.apache.flink.table.api.java.BatchTableEnvironment;
>> import org.apache.flink.table.catalog.ExternalCatalogTable;
>> import org.apache.flink.table.catalog.InMemoryExternalCatalog;
>> import org.apache.flink.table.descriptors.Csv;
>> import org.apache.flink.table.descriptors.FileSystem;
>> import org.apache.flink.table.descriptors.FormatDescriptor;
>> import org.apache.flink.table.descriptors.Schema;
>> import org.apache.flink.table.sinks.CsvTableSink;
>>
>> public class CatalogExperiment {
>>   public static void main(String[] args) throws Exception {
>>     // create an external catalog
>>     final String outPath = "file:/tmp/file2.txt";
>>     InMemoryExternalCatalog catalog = new InMemoryExternalCatalog("test");
>>     FileSystem connDescIn = new
>> FileSystem().path("file:/tmp/file-test.txt");
>>     FileSystem connDescOut = new FileSystem().path(outPath);
>>     FormatDescriptor csvDesc = new Csv()//
>>         .field("a", "string")//
>>         .field("b", "string")//
>>         .field("c", "string")//
>>         .fieldDelimiter("\t");
>>     Schema schemaDesc = new Schema()//
>>         .field("a", "string")//
>>         .field("b", "string")//
>>         .field("c", "string");
>>     ExternalCatalogTable t1 = ExternalCatalogTable.builder(connDescIn)//
>>         .withFormat(csvDesc)//
>>         .withSchema(schemaDesc)//
>>         .asTableSource();
>>     ExternalCatalogTable t2 = ExternalCatalogTable.builder(connDescOut)//
>>         .withFormat(csvDesc)//
>>         .withSchema(schemaDesc)//
>>         .asTableSink();
>>     catalog.createTable("t1", t1, true);
>>     catalog.createTable("t2", t2, true);
>>
>>     final  ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>>     final BatchTableEnvironment btEnv =
>> TableEnvironment.getTableEnvironment(env);
>>     btEnv.registerExternalCatalog("test", catalog);
>>     // this does not work ---------------------------------------
>>     btEnv.scan("test", "t1").insertInto("test.t2"); //ERROR: No table was
>> registered under the name test.t2
>>     // this works ---------------------------------------
>>     btEnv.scan("test", "t1").writeToSink(new CsvTableSink(outPath, "\t",
>> 1, WriteMode.OVERWRITE));
>>     env.execute();
>>   }
>> }
>>
>>
>> Best,
>> Flavio
>>
>>

Re: Java Table API and external catalog bug?

Posted by Flavio Pompermaier <po...@okkam.it>.
I've tried with t2, test.t2 and test.test.t2.

On Mon, 22 Oct 2018, 19:26 Zhang, Xuefu, <xu...@alibaba-inc.com> wrote:

> Have you tried "t2" instead of "test.t2"? There is a possibility that
> catalog name isn't part of the table name in the table API.
>
> Thanks,
> Xuefu
>
> ------------------------------------------------------------------
> Sender:Flavio Pompermaier <po...@okkam.it>
> Sent at:2018 Oct 22 (Mon) 23:06
> Recipient:user <us...@flink.apache.org>
> Subject:Java Table API and external catalog bug?
>
> Hi to all,
> I've tried to register an external catalog and use it with the Table API
> in Flink 1.6.1.
> The following (Java) test job cannot write to a sink using insertInto
> because Flink cannot find the table by id (test.t2). Am I doing something
> wrong or is this a bug?
>
> This is my Java test class:
>
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.core.fs.FileSystem.WriteMode;
> import org.apache.flink.table.api.TableEnvironment;
> import org.apache.flink.table.api.java.BatchTableEnvironment;
> import org.apache.flink.table.catalog.ExternalCatalogTable;
> import org.apache.flink.table.catalog.InMemoryExternalCatalog;
> import org.apache.flink.table.descriptors.Csv;
> import org.apache.flink.table.descriptors.FileSystem;
> import org.apache.flink.table.descriptors.FormatDescriptor;
> import org.apache.flink.table.descriptors.Schema;
> import org.apache.flink.table.sinks.CsvTableSink;
>
> public class CatalogExperiment {
>   public static void main(String[] args) throws Exception {
>     // create an external catalog
>     final String outPath = "file:/tmp/file2.txt";
>     InMemoryExternalCatalog catalog = new InMemoryExternalCatalog("test");
>     FileSystem connDescIn = new
> FileSystem().path("file:/tmp/file-test.txt");
>     FileSystem connDescOut = new FileSystem().path(outPath);
>     FormatDescriptor csvDesc = new Csv()//
>         .field("a", "string")//
>         .field("b", "string")//
>         .field("c", "string")//
>         .fieldDelimiter("\t");
>     Schema schemaDesc = new Schema()//
>         .field("a", "string")//
>         .field("b", "string")//
>         .field("c", "string");
>     ExternalCatalogTable t1 = ExternalCatalogTable.builder(connDescIn)//
>         .withFormat(csvDesc)//
>         .withSchema(schemaDesc)//
>         .asTableSource();
>     ExternalCatalogTable t2 = ExternalCatalogTable.builder(connDescOut)//
>         .withFormat(csvDesc)//
>         .withSchema(schemaDesc)//
>         .asTableSink();
>     catalog.createTable("t1", t1, true);
>     catalog.createTable("t2", t2, true);
>
>     final  ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
>     final BatchTableEnvironment btEnv =
> TableEnvironment.getTableEnvironment(env);
>     btEnv.registerExternalCatalog("test", catalog);
>     // this does not work ---------------------------------------
>     btEnv.scan("test", "t1").insertInto("test.t2"); //ERROR: No table was
> registered under the name test.t2
>     // this works ---------------------------------------
>     btEnv.scan("test", "t1").writeToSink(new CsvTableSink(outPath, "\t",
> 1, WriteMode.OVERWRITE));
>     env.execute();
>   }
> }
>
>
> Best,
> Flavio
>
>

Re: Java Table API and external catalog bug?

Posted by "Zhang, Xuefu" <xu...@alibaba-inc.com>.
Have you tried "t2" instead of "test.t2"? There is a possibility that catalog name isn't part of the table name in the table API.

Thanks,
Xuefu


------------------------------------------------------------------
Sender:Flavio Pompermaier <po...@okkam.it>
Sent at:2018 Oct 22 (Mon) 23:06
Recipient:user <us...@flink.apache.org>
Subject:Java Table API and external catalog bug?

Hi to all,
I've tried to register an external catalog and use it with the Table API in Flink 1.6.1.
The following (Java) test job cannot write to a sink using insertInto because Flink cannot find the table by id (test.t2). Am I doing something wrong or is this a bug?

This is my Java test class:

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.catalog.ExternalCatalogTable;
import org.apache.flink.table.catalog.InMemoryExternalCatalog;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.FormatDescriptor;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.sinks.CsvTableSink;

public class CatalogExperiment {
  public static void main(String[] args) throws Exception {
    // create an external catalog
    final String outPath = "file:/tmp/file2.txt";
    InMemoryExternalCatalog catalog = new InMemoryExternalCatalog("test");
    FileSystem connDescIn = new FileSystem().path("file:/tmp/file-test.txt");
    FileSystem connDescOut = new FileSystem().path(outPath);
    FormatDescriptor csvDesc = new Csv()//
        .field("a", "string")//
        .field("b", "string")//
        .field("c", "string")//
        .fieldDelimiter("\t");
    Schema schemaDesc = new Schema()//
        .field("a", "string")//
        .field("b", "string")//
        .field("c", "string");
    ExternalCatalogTable t1 = ExternalCatalogTable.builder(connDescIn)//
        .withFormat(csvDesc)//
        .withSchema(schemaDesc)//
        .asTableSource();
    ExternalCatalogTable t2 = ExternalCatalogTable.builder(connDescOut)//
        .withFormat(csvDesc)//
        .withSchema(schemaDesc)//
        .asTableSink();
    catalog.createTable("t1", t1, true);
    catalog.createTable("t2", t2, true);

    final  ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    final BatchTableEnvironment btEnv = TableEnvironment.getTableEnvironment(env);
    btEnv.registerExternalCatalog("test", catalog);
    // this does not work ---------------------------------------
    btEnv.scan("test", "t1").insertInto("test.t2"); //ERROR: No table was registered under the name test.t2
    // this works ---------------------------------------
    btEnv.scan("test", "t1").writeToSink(new CsvTableSink(outPath, "\t", 1, WriteMode.OVERWRITE));
    env.execute();
  }
}


Best,
Flavio