You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Harshvardhan Shinde <ha...@oyorooms.com> on 2021/09/09 10:02:05 UTC

Issue while creating Hive table from Kafka topic

Hi,

I'm trying a simple flink job that reads data from a kafka topic and
creates a Hive table.

I'm following the steps from here
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/hive/overview/#connecting-to-hive>
.

Here's my code:

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;

EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

String name            = "myhive";
String defaultDatabase = "harsh_test";
String hiveConfDir     = "/etc/hive/conf";

HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog(name, hive);

// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog(name);

tableEnv.executeSql("CREATE TABLE IF NOT EXISTS transactions (\n" +
      "  `created_at` TIMESTAMP(3) METADATA FROM 'timestamp',\n" +
      "      `partition` BIGINT METADATA VIRTUAL,\n" +
      "      `offset` BIGINT METADATA VIRTUAL,\n" +
      "    account_id  BIGINT,\n" +
      "    amount      BIGINT,\n" +
      "    transaction_time TIMESTAMP(3),\n" +
      "    WATERMARK FOR transaction_time AS transaction_time -
INTERVAL '5' SECOND\n" +
      ") WITH (\n" +
      "    'connector' = 'kafka',\n" +
      "    'topic'     = 'flink-stream-table',\n" +
      "    'properties.bootstrap.servers' = '<BROKER_ADDRESS>:9092',\n" +
      "   'scan.startup.mode' = 'earliest-offset',\n" +
      "    'format'    = 'csv'\n" +
      ")");

Table table = tableEnv.sqlQuery("Select * from transactions");
table.execute().print();

The code builds successfully, but I'm getting the following runtime error:

Caused by: java.util.concurrent.CompletionException:
java.lang.NoClassDefFoundError:
org/apache/kafka/common/serialization/ByteArrayDeserializer at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
..

Here are my pom.xml file contents:

<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.harsh.test</groupId>
    <artifactId>harsh-flink-test</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>Flink Quickstart Job</name>
    <url>http://www.myorganization.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.13.2</flink.version>
        <java.version>1.8</java.version>
        <hive.version>2.3.6</hive.version>
        <scala.binary.version>2.12</scala.binary.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
    </properties>

    <repositories>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>

    <dependencies>
        <!-- Apache Flink dependencies -->
        <!-- These dependencies are provided, because they should not
be packaged into the JAR file. -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- Add connector dependencies here. They must be in the
default scope (compile). -->

        <!-- Example:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        -->

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka
-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner
-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>1.13.2</version>
        </dependency>


        <!-- Flink Dependency -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- Hive Dependency -->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>${hive.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/javax.servlet/javax.servlet-api
-->
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>javax.servlet-api</artifactId>
            <version>3.1.0</version>
            <scope>provided</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.htrace/htrace-core4
-->
        <dependency>
            <groupId>org.apache.htrace</groupId>
            <artifactId>htrace-core4</artifactId>
            <version>4.0.1-incubating</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/commons-configuration/commons-configuration
-->
        <dependency>
            <groupId>commons-configuration</groupId>
            <artifactId>commons-configuration</artifactId>
            <version>1.10</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/commons-logging/commons-logging
-->
        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging</artifactId>
            <version>1.2</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2
-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop-2</artifactId>
            <version>2.8.3-10.0</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-hadoop-compatibility
-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-hadoop-compatibility_2.12</artifactId>
            <version>1.13.2</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-hadoop-fs
-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-hadoop-fs</artifactId>
            <version>1.13.2</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-csv -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>1.13.2</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-json -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>1.13.2</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-hive-1.2.2
-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-sql-connector-hive-2.3.6_2.12</artifactId>
            <version>1.13.2</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.13.2</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.8.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>2.8.0</version>
        </dependency>


        <!-- Add logging framework, to produce console output when
running in the IDE. -->
        <!-- These dependencies are excluded from the application JAR
by default. -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>

            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>

            <!-- We use the maven-shade plugin to create a fat jar
that contains all necessary dependencies. -->
            <!-- Change the value of <mainClass>...</mainClass> if
your program entry point changes. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>

<exclude>org.apache.flink:force-shading</exclude>

<exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in
the META-INF folder.
                                    Otherwise, this might cause
SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">

<mainClass>com.harsh.test.StreamingJob</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <pluginManagement>
            <plugins>

                <!-- This improves the out-of-the-box experience in
Eclipse by resolving some warnings. -->
                <plugin>
                    <groupId>org.eclipse.m2e</groupId>
                    <artifactId>lifecycle-mapping</artifactId>
                    <version>1.0.0</version>
                    <configuration>
                        <lifecycleMappingMetadata>
                            <pluginExecutions>
                                <pluginExecution>
                                    <pluginExecutionFilter>

<groupId>org.apache.maven.plugins</groupId>

<artifactId>maven-shade-plugin</artifactId>
                                        <versionRange>[3.0.0,)</versionRange>
                                        <goals>
                                            <goal>shade</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                                <pluginExecution>
                                    <pluginExecutionFilter>

<groupId>org.apache.maven.plugins</groupId>

<artifactId>maven-compiler-plugin</artifactId>
                                        <versionRange>[3.1,)</versionRange>
                                        <goals>
                                            <goal>testCompile</goal>
                                            <goal>compile</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                            </pluginExecutions>
                        </lifecycleMappingMetadata>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

    <!-- This profile helps to make things run out of the box in IntelliJ -->
    <!-- Its adds Flink's core classes to the runtime class path. -->
    <!-- Otherwise they are missing in IntelliJ, because the
dependency is 'provided' -->
    <profiles>
        <profile>
            <id>add-dependencies-for-IDEA</id>

            <activation>
                <property>
                    <name>idea.version</name>
                </property>
            </activation>

            <dependencies>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-java</artifactId>
                    <version>${flink.version}</version>
                    <scope>compile</scope>
                </dependency>
                <dependency>
                    <groupId>org.apache.flink</groupId>

<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                    <version>${flink.version}</version>
                    <scope>compile</scope>
                </dependency>
            </dependencies>
        </profile>
    </profiles>

</project>


Please help me resolve the issue.

Thanks

Re: Issue while creating Hive table from Kafka topic

Posted by Harshvardhan Shinde <ha...@oyorooms.com>.
Hi,
I checked for the dependency using the stackoverflow link you sent, the
ByteArrayDeserializer class is present in my jar but when I try to run it,
I'm getting the same error message.
I also removed the kafka and kafka-client dependencies from my pom.xml file
and added the jar from the link you provided.

On Fri, Sep 10, 2021 at 4:46 PM Timo Walther <tw...@apache.org> wrote:

> It seems that your Kafka clients dependency is not in your JAR file.
>
>
> ByteArrayDeserializer is a symptom that seems to occur often. At least,
> I can find a similar question on Stackoverflow:
>
>
> https://stackoverflow.com/questions/58408494/flink-1-3-class-not-found-org-apache-kafka-common-bytearraydeserializer
>
> Maybe this helps.
>
> Btw for Table API and SQL, you could also try out the fat JAR that we
> provide for the SQL Client:
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/kafka/
>
> links to
>
>
> https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.13.2/flink-sql-connector-kafka_2.11-1.13.2.jar
>
> Here we try to do the dependency management for users.
>
> Regards,
> Timo
>
> On 10.09.21 11:20, Harshvardhan Shinde wrote:
> > I'm unable to figure out which dependency to add in order for the
> > ByteArrayDeserializer class to get included in the jar. I have added all
> > the dependencies as per the documentation still unable to figure out the
> > cause.
> >
> > On Fri, Sep 10, 2021 at 12:17 AM Robert Metzger <rmetzger@apache.org
> > <ma...@apache.org>> wrote:
> >
> >     Does the jar file you are trying to submit contain
> >     the org/apache/kafka/common/serialization/ByteArrayDeserializer
> class?
> >
> >     On Thu, Sep 9, 2021 at 2:10 PM Harshvardhan Shinde
> >     <harshvardhan.shinde@oyorooms.com
> >     <ma...@oyorooms.com>> wrote:
> >
> >         Here's the complete stack trace:
> >
> >         Server
> >
>  Response:org.apache.flink.runtime.rest.handler.RestHandlerException:
> >         Could not execute application. at
> >
>  org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:108)
> >         at
> >
>  java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
> >         at
> >
>  java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
> >         at
> >
>  java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> >         at
> >
>  java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
> >         at
> >
>  java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >         at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
> >
>  java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> >         at
> >
>  java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> >         at
> >
>  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >         at
> >
>  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >         at java.lang.Thread.run(Thread.java:748) Caused by:
> >         java.util.concurrent.CompletionException:
> >         java.lang.NoClassDefFoundError:
> >         org/apache/kafka/common/serialization/ByteArrayDeserializer at
> >
>  java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
> >         at
> >
>  java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
> >         at
> >
>  java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
> >         ... 7 more Caused by: java.lang.NoClassDefFoundError:
> >         org/apache/kafka/common/serialization/ByteArrayDeserializer at
> >
>  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.setDeserializer(FlinkKafkaConsumer.java:322)
> >         at
> >
>  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:223)
> >         at
> >
>  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:154)
> >         at
> >
>  org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createKafkaConsumer(KafkaDynamicSource.java:383)
> >         at
> >
>  org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.getScanRuntimeProvider(KafkaDynamicSource.java:205)
> >         at
> >
>  org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:453)
> >         at
> >
>  org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:161)
> >         at
> >
>  org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:119)
> >         at
> >
>  org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:85)
> >         at
> >
>  org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
> >         at
> >
>  org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
> >         at
> >
>  org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
> >         at
> >
>  org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
> >         at
> >
>  org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
> >         at
> >
>  org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
> >         at
> >
>  org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
> >         at
> >
>  org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
> >         at
> >
>  org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
> >         at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> >         <
> http://org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> >$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:169)
> >         at
> >
>  org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:161)
> >         at
> >
>  org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:989)
> >         at
> >
>  org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:958)
> >         at
> >
>  org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:283)
> >         at
> >
>  org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
> >         at
> >
>  org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704)
> >         at com.harsh.test.StreamingJob.main(StreamingJob.java:106) at
> >         sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
> >
>  sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >         at
> >
>  sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >         at java.lang.reflect.Method.invoke(Method.java:498) at
> >
>  org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> >         at
> >
>  org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> >         at
> >
>  org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> >         at
> >
>  org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
> >         at
> >
>  org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
> >         at
> >
>  org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102)
> >         at
> >
>  java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
> >         ... 7 more
> >
> >         On Thu, Sep 9, 2021 at 5:36 PM Robert Metzger
> >         <rmetzger@apache.org <ma...@apache.org>> wrote:
> >
> >             Can you share the full stack trace, not just a part of it?
> >
> >             On Thu, Sep 9, 2021 at 1:43 PM Harshvardhan Shinde
> >             <harshvardhan.shinde@oyorooms.com
> >             <ma...@oyorooms.com>> wrote:
> >
> >                 Hi,
> >
> >                 I added the dependencies while trying to resolve the
> >                 same issue, thought I was missing them.
> >
> >                 Thanks
> >
> >                 On Thu, Sep 9, 2021 at 4:26 PM Robert Metzger
> >                 <rmetzger@apache.org <ma...@apache.org>>
> wrote:
> >
> >                     Hey,
> >
> >                     Why do you have these dependencies in your pom?
> >
> >                     <!--
> >
> https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
> >                     <
> https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients>
> >                     -->
> >                     <dependency>
> >                     <groupId>org.apache.kafka</groupId>
> >                     <artifactId>kafka-clients</artifactId>
> >                     <version>2.8.0</version>
> >                     </dependency>
> >
> >                     <dependency>
> >                     <groupId>org.apache.kafka</groupId>
> >                     <artifactId>kafka_2.12</artifactId>
> >                     <version>2.8.0</version>
> >                     </dependency>
> >
> >
> >                     They are not needed for using the Kafka connector of
> >                     Flink (the flink kafka connector dependencies pulls
> >                     the required dependencies)
> >
> >
> >                     On Thu, Sep 9, 2021 at 12:02 PM Harshvardhan Shinde
> >                     <harshvardhan.shinde@oyorooms.com
> >                     <ma...@oyorooms.com>> wrote:
> >
> >                         Hi,
> >
> >                         I'm trying a simple flink job that reads data
> >                         from a kafka topic and creates a Hive table.
> >
> >                         I'm following the steps from here
> >                         <
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/hive/overview/#connecting-to-hive
> >.
> >
> >                         Here's my code:
> >
> >                         import
> org.apache.flink.table.api.EnvironmentSettings;
> >                         import org.apache.flink.table.api.Table;
> >                         import
> org.apache.flink.table.api.TableEnvironment;
> >                         import
> org.apache.flink.table.catalog.hive.HiveCatalog;
> >
> >                         EnvironmentSettings settings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> >                         TableEnvironment tableEnv =
> TableEnvironment.create(settings);
> >
> >                         String name            ="myhive";
> >                         String defaultDatabase ="harsh_test";
> >                         String hiveConfDir     ="/etc/hive/conf";
> >
> >                         HiveCatalog hive =new HiveCatalog(name,
> defaultDatabase, hiveConfDir);
> >                         tableEnv.registerCatalog(name, hive);
> >
> >                         // set the HiveCatalog as the current catalog of
> >                         the session
> >                         tableEnv.useCatalog(name);
> >
> >                         tableEnv.executeSql("CREATE TABLE IF NOT EXISTS
> transactions (\n" +
> >                                " `created_at` TIMESTAMP(3) METADATA FROM
> >                         'timestamp',\n" +
> >                                " `partition` BIGINT METADATA VIRTUAL,\n"
> +
> >                                " `offset` BIGINT METADATA VIRTUAL,\n" +
> >                                " account_id BIGINT,\n" +
> >                                " amount BIGINT,\n" +
> >                                " transaction_time TIMESTAMP(3),\n" +
> >                                " WATERMARK FOR transaction_time AS
> >                         transaction_time - INTERVAL '5' SECOND\n" +
> >                                ") WITH (\n" +
> >                                " 'connector' = 'kafka',\n" +
> >                                " 'topic' = 'flink-stream-table',\n" +
> >                                " 'properties.bootstrap.servers' =
> >                         '<BROKER_ADDRESS>:9092',\n" +
> >                                " 'scan.startup.mode' =
> 'earliest-offset',\n" +
> >                                " 'format' = 'csv'\n" +
> >                                ")");
> >
> >                         Table table = tableEnv.sqlQuery("Select * from
> transactions");
> >                         table.execute().print();
> >
> >                         The code builds successfully, but I'm getting
> >                         the following runtime error:
> >
> >                         Caused by:
> >                         java.util.concurrent.CompletionException:
> >                         java.lang.NoClassDefFoundError:
> >
>  org/apache/kafka/common/serialization/ByteArrayDeserializer
> >                         at
> >
>  java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
> >                         at
> >
>  java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
> >                         at
> >
>  java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
> >                         ..
> >
> >                         Here are my pom.xml file contents:
> >
> >                         <!--
> >                         Licensed to the Apache Software Foundation (ASF)
> >                         under one
> >                         or more contributor license agreements. See the
> >                         NOTICE file
> >                         distributed with this work for additional
> >                         information
> >                         regarding copyright ownership. The ASF licenses
> >                         this file
> >                         to you under the Apache License, Version 2.0 (the
> >                         "License"); you may not use this file except in
> >                         compliance
> >                         with the License. You may obtain a copy of the
> >                         License at
> >
> >                         http://www.apache.org/licenses/LICENSE-2.0
> >                         <http://www.apache.org/licenses/LICENSE-2.0>
> >
> >                         Unless required by applicable law or agreed to
> >                         in writing,
> >                         software distributed under the License is
> >                         distributed on an
> >                         "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
> >                         OF ANY
> >                         KIND, either express or implied. See the License
> >                         for the
> >                         specific language governing permissions and
> >                         limitations
> >                         under the License.
> >                         -->
> >                         <project
> >                         xmlns="http://maven.apache.org/POM/4.0.0
> >                         <http://maven.apache.org/POM/4.0.0>"
> >                         xmlns:xsi="
> http://www.w3.org/2001/XMLSchema-instance
> >                         <http://www.w3.org/2001/XMLSchema-instance>"
> >                         xsi:schemaLocation="
> http://maven.apache.org/POM/4.0.0
> >                         <http://maven.apache.org/POM/4.0.0>
> >                         http://maven.apache.org/xsd/maven-4.0.0.xsd
> >                         <http://maven.apache.org/xsd/maven-4.0.0.xsd>">
> >                         <modelVersion>4.0.0</modelVersion>
> >
> >                         <groupId>com.harsh.test</groupId>
> >                         <artifactId>harsh-flink-test</artifactId>
> >                         <version>1.0-SNAPSHOT</version>
> >                         <packaging>jar</packaging>
> >
> >                         <name>Flink Quickstart Job</name>
> >                         <url>http://www.myorganization.org  <
> http://www.myorganization.org></url>
> >
> >                         <properties>
> >
>  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
> >                         <flink.version>1.13.2</flink.version>
> >                         <java.version>1.8</java.version>
> >                         <hive.version>2.3.6</hive.version>
> >                         <scala.binary.version>2.12</scala.binary.version>
> >
>  <maven.compiler.source>${java.version}</maven.compiler.source>
> >
>  <maven.compiler.target>${java.version}</maven.compiler.target>
> >                         </properties>
> >
> >                         <repositories>
> >                         <repository>
> >                         <id>apache.snapshots</id>
> >                         <name>Apache Development Snapshot
> Repository</name>
> >                         <url>
> https://repository.apache.org/content/repositories/snapshots/  <
> https://repository.apache.org/content/repositories/snapshots/></url>
> >                         <releases>
> >                         <enabled>false</enabled>
> >                         </releases>
> >                         <snapshots>
> >                         <enabled>true</enabled>
> >                         </snapshots>
> >                         </repository>
> >                         </repositories>
> >
> >                         <dependencies>
> >                         <!-- Apache Flink dependencies -->
> >                         <!-- These dependencies are provided, because
> >                         they should not be packaged into the JAR file.
> -->
> >                         <dependency>
> >                         <groupId>org.apache.flink</groupId>
> >                         <artifactId>flink-java</artifactId>
> >                         <version>${flink.version}</version>
> >                         </dependency>
> >                         <dependency>
> >                         <groupId>org.apache.flink</groupId>
> >
>  <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
> >                         <version>${flink.version}</version>
> >                         </dependency>
> >
> >                         <!-- Add connector dependencies here. They must
> >                         be in the default scope (compile). -->
> >
> >                         <!-- Example:
> >
> >                         <dependency>
> >                         <groupId>org.apache.flink</groupId>
> >
>  <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
> >                         <version>${flink.version}</version>
> >                         </dependency>
> >                         -->
> >
> >                         <!--
> >
> https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka
> >                         <
> https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka>
> >                         -->
> >                         <dependency>
> >                         <groupId>org.apache.flink</groupId>
> >
>  <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
> >                         <version>${flink.version}</version>
> >                         </dependency>
> >
> >                         <dependency>
> >                         <groupId>org.apache.flink</groupId>
> >
>  <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
> >                         <version>${flink.version}</version>
> >                         </dependency>
> >
> >                         <dependency>
> >                         <groupId>org.apache.flink</groupId>
> >
>  <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
> >                         <version>${flink.version}</version>
> >                         </dependency>
> >
> >                         <!--
> >
> https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner
> >                         <
> https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner>
> >                         -->
> >                         <dependency>
> >                         <groupId>org.apache.flink</groupId>
> >                         <artifactId>flink-table-planner_2.12</artifactId>
> >                         <version>1.13.2</version>
> >                         </dependency>
> >
> >
> >                         <!-- Flink Dependency -->
> >                         <dependency>
> >                         <groupId>org.apache.flink</groupId>
> >
>  <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
> >                         <version>${flink.version}</version>
> >                         </dependency>
> >
> >                         <!-- Hive Dependency -->
> >                         <dependency>
> >                         <groupId>org.apache.hive</groupId>
> >                         <artifactId>hive-exec</artifactId>
> >                         <version>${hive.version}</version>
> >                         </dependency>
> >
> >                         <!--
> >
> https://mvnrepository.com/artifact/javax.servlet/javax.servlet-api
> >                         <
> https://mvnrepository.com/artifact/javax.servlet/javax.servlet-api>
> >                         -->
> >                         <dependency>
> >                         <groupId>javax.servlet</groupId>
> >                         <artifactId>javax.servlet-api</artifactId>
> >                         <version>3.1.0</version>
> >                         <scope>provided</scope>
> >                         </dependency>
> >
> >                         <!--
> >
> https://mvnrepository.com/artifact/org.apache.htrace/htrace-core4
> >                         <
> https://mvnrepository.com/artifact/org.apache.htrace/htrace-core4>
> >                         -->
> >                         <dependency>
> >                         <groupId>org.apache.htrace</groupId>
> >                         <artifactId>htrace-core4</artifactId>
> >                         <version>4.0.1-incubating</version>
> >                         </dependency>
> >
> >                         <!--
> >
> https://mvnrepository.com/artifact/commons-configuration/commons-configuration
> >                         <
> https://mvnrepository.com/artifact/commons-configuration/commons-configuration
> >
> >                         -->
> >                         <dependency>
> >                         <groupId>commons-configuration</groupId>
> >                         <artifactId>commons-configuration</artifactId>
> >                         <version>1.10</version>
> >                         </dependency>
> >
> >                         <!--
> >
> https://mvnrepository.com/artifact/commons-logging/commons-logging
> >                         <
> https://mvnrepository.com/artifact/commons-logging/commons-logging>
> >                         -->
> >                         <dependency>
> >                         <groupId>commons-logging</groupId>
> >                         <artifactId>commons-logging</artifactId>
> >                         <version>1.2</version>
> >                         </dependency>
> >
> >                         <!--
> >
> https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2
> >                         <
> https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2>
> >                         -->
> >                         <dependency>
> >                         <groupId>org.apache.flink</groupId>
> >                         <artifactId>flink-shaded-hadoop-2</artifactId>
> >                         <version>2.8.3-10.0</version>
> >                         </dependency>
> >
> >
> >                         <!--
> >
> https://mvnrepository.com/artifact/org.apache.flink/flink-hadoop-compatibility
> >                         <
> https://mvnrepository.com/artifact/org.apache.flink/flink-hadoop-compatibility
> >
> >                         -->
> >                         <dependency>
> >                         <groupId>org.apache.flink</groupId>
> >
>  <artifactId>flink-hadoop-compatibility_2.12</artifactId>
> >                         <version>1.13.2</version>
> >                         </dependency>
> >
> >                         <!--
> >
> https://mvnrepository.com/artifact/org.apache.flink/flink-hadoop-fs
> >                         <
> https://mvnrepository.com/artifact/org.apache.flink/flink-hadoop-fs>
> >                         -->
> >                         <dependency>
> >                         <groupId>org.apache.flink</groupId>
> >                         <artifactId>flink-hadoop-fs</artifactId>
> >                         <version>1.13.2</version>
> >                         </dependency>
> >
> >                         <!--
> >
> https://mvnrepository.com/artifact/org.apache.flink/flink-csv
> >                         <
> https://mvnrepository.com/artifact/org.apache.flink/flink-csv>
> >                         -->
> >                         <dependency>
> >                         <groupId>org.apache.flink</groupId>
> >                         <artifactId>flink-csv</artifactId>
> >                         <version>1.13.2</version>
> >                         </dependency>
> >
> >                         <!--
> >
> https://mvnrepository.com/artifact/org.apache.flink/flink-json
> >                         <
> https://mvnrepository.com/artifact/org.apache.flink/flink-json>
> >                         -->
> >                         <dependency>
> >                         <groupId>org.apache.flink</groupId>
> >                         <artifactId>flink-json</artifactId>
> >                         <version>1.13.2</version>
> >                         </dependency>
> >
> >                         <!--
> >
> https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-hive-1.2.2
> >                         <
> https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-hive-1.2.2
> >
> >                         -->
> >                         <dependency>
> >                         <groupId>org.apache.flink</groupId>
> >
>  <artifactId>flink-sql-connector-hive-2.3.6_2.12</artifactId>
> >                         <version>1.13.2</version>
> >                         </dependency>
> >
> >                         <!--
> >
> https://mvnrepository.com/artifact/org.apache.flink/flink-core
> >                         <
> https://mvnrepository.com/artifact/org.apache.flink/flink-core>
> >                         -->
> >                         <dependency>
> >                         <groupId>org.apache.flink</groupId>
> >                         <artifactId>flink-core</artifactId>
> >                         <version>1.13.2</version>
> >                         </dependency>
> >
> >                         <!--
> >
> https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
> >                         <
> https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients>
> >                         -->
> >                         <dependency>
> >                         <groupId>org.apache.kafka</groupId>
> >                         <artifactId>kafka-clients</artifactId>
> >                         <version>2.8.0</version>
> >                         </dependency>
> >
> >                         <dependency>
> >                         <groupId>org.apache.kafka</groupId>
> >                         <artifactId>kafka_2.12</artifactId>
> >                         <version>2.8.0</version>
> >                         </dependency>
> >
> >
> >                         <!-- Add logging framework, to produce console
> >                         output when running in the IDE. -->
> >                         <!-- These dependencies are excluded from the
> >                         application JAR by default. -->
> >                         <dependency>
> >                         <groupId>org.slf4j</groupId>
> >                         <artifactId>slf4j-log4j12</artifactId>
> >                         <version>1.7.7</version>
> >                         <scope>runtime</scope>
> >                         </dependency>
> >                         <dependency>
> >                         <groupId>log4j</groupId>
> >                         <artifactId>log4j</artifactId>
> >                         <version>1.2.17</version>
> >                         <scope>runtime</scope>
> >                         </dependency>
> >                         </dependencies>
> >
> >                         <build>
> >                         <plugins>
> >
> >                         <!-- Java Compiler -->
> >                         <plugin>
> >                         <groupId>org.apache.maven.plugins</groupId>
> >                         <artifactId>maven-compiler-plugin</artifactId>
> >                         <version>3.1</version>
> >                         <configuration>
> >                         <source>${java.version}</source>
> >                         <target>${java.version}</target>
> >                         </configuration>
> >                         </plugin>
> >
> >                         <!-- We use the maven-shade plugin to create a
> >                         fat jar that contains all necessary
> >                         dependencies. -->
> >                         <!-- Change the value of
> >                         <mainClass>...</mainClass> if your program entry
> >                         point changes. -->
> >                         <plugin>
> >                         <groupId>org.apache.maven.plugins</groupId>
> >                         <artifactId>maven-shade-plugin</artifactId>
> >                         <version>3.0.0</version>
> >                         <executions>
> >                         <!-- Run shade goal on package phase -->
> >                         <execution>
> >                         <phase>package</phase>
> >                         <goals>
> >                         <goal>shade</goal>
> >                         </goals>
> >                         <configuration>
> >                         <artifactSet>
> >                         <excludes>
> >                         <exclude>org.apache.flink:force-shading</exclude>
> >
>  <exclude>com.google.code.findbugs:jsr305</exclude>
> >                         <exclude>org.slf4j:*</exclude>
> >                         <exclude>log4j:*</exclude>
> >                         </excludes>
> >                         </artifactSet>
> >                         <filters>
> >                         <filter>
> >                         <!-- Do not copy the signatures in the META-INF
> >                         folder.
> >                         Otherwise, this might cause SecurityExceptions
> >                         when using the JAR. -->
> >                         <artifact>*:*</artifact>
> >                         <excludes>
> >                         <exclude>META-INF/*.SF</exclude>
> >                         <exclude>META-INF/*.DSA</exclude>
> >                         <exclude>META-INF/*.RSA</exclude>
> >                         </excludes>
> >                         </filter>
> >                         </filters>
> >                         <transformers>
> >                         <transformer
> >
>  implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
> >
>  <mainClass>com.harsh.test.StreamingJob</mainClass>
> >                         </transformer>
> >                         </transformers>
> >                         </configuration>
> >                         </execution>
> >                         </executions>
> >                         </plugin>
> >                         </plugins>
> >
> >                         <pluginManagement>
> >                         <plugins>
> >
> >                         <!-- This improves the out-of-the-box experience
> >                         in Eclipse by resolving some warnings. -->
> >                         <plugin>
> >                         <groupId>org.eclipse.m2e</groupId>
> >                         <artifactId>lifecycle-mapping</artifactId>
> >                         <version>1.0.0</version>
> >                         <configuration>
> >                         <lifecycleMappingMetadata>
> >                         <pluginExecutions>
> >                         <pluginExecution>
> >                         <pluginExecutionFilter>
> >                         <groupId>org.apache.maven.plugins</groupId>
> >                         <artifactId>maven-shade-plugin</artifactId>
> >                         <versionRange>[3.0.0,)</versionRange>
> >                         <goals>
> >                         <goal>shade</goal>
> >                         </goals>
> >                         </pluginExecutionFilter>
> >                         <action>
> >                         <ignore/>
> >                         </action>
> >                         </pluginExecution>
> >                         <pluginExecution>
> >                         <pluginExecutionFilter>
> >                         <groupId>org.apache.maven.plugins</groupId>
> >                         <artifactId>maven-compiler-plugin</artifactId>
> >                         <versionRange>[3.1,)</versionRange>
> >                         <goals>
> >                         <goal>testCompile</goal>
> >                         <goal>compile</goal>
> >                         </goals>
> >                         </pluginExecutionFilter>
> >                         <action>
> >                         <ignore/>
> >                         </action>
> >                         </pluginExecution>
> >                         </pluginExecutions>
> >                         </lifecycleMappingMetadata>
> >                         </configuration>
> >                         </plugin>
> >                         </plugins>
> >                         </pluginManagement>
> >                         </build>
> >
> >                         <!-- This profile helps to make things run out
> >                         of the box in IntelliJ -->
> >                         <!-- Its adds Flink's core classes to the
> >                         runtime class path. -->
> >                         <!-- Otherwise they are missing in IntelliJ,
> >                         because the dependency is 'provided' -->
> >                         <profiles>
> >                         <profile>
> >                         <id>add-dependencies-for-IDEA</id>
> >
> >                         <activation>
> >                         <property>
> >                         <name>idea.version</name>
> >                         </property>
> >                         </activation>
> >
> >                         <dependencies>
> >                         <dependency>
> >                         <groupId>org.apache.flink</groupId>
> >                         <artifactId>flink-java</artifactId>
> >                         <version>${flink.version}</version>
> >                         <scope>compile</scope>
> >                         </dependency>
> >                         <dependency>
> >                         <groupId>org.apache.flink</groupId>
> >
>  <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
> >                         <version>${flink.version}</version>
> >                         <scope>compile</scope>
> >                         </dependency>
> >                         </dependencies>
> >                         </profile>
> >                         </profiles>
> >
> >                         </project>
> >
> >
> >                         Please help me resolve the issue.
> >
> >                         Thanks
> >
> >
> >
> >
> >
> >                 --
> >                 Thanks and Regards,
> >                 Harshvardhan
> >                 Data Platform
> >
> >
> >
> >         --
> >         Thanks and Regards,
> >         Harshvardhan
> >         Data Platform
> >
> >
> >
> > --
> > Thanks and Regards,
> > Harshvardhan
> > Data Platform
>
>

-- 
Thanks and Regards,
Harshvardhan
Data Platform

Re: Issue while creating Hive table from Kafka topic

Posted by Timo Walther <tw...@apache.org>.
It seems that your Kafka clients dependency is not in your JAR file.


ByteArrayDeserializer is a symptom that seems to occur often. At least, 
I can find a similar question on Stackoverflow:

https://stackoverflow.com/questions/58408494/flink-1-3-class-not-found-org-apache-kafka-common-bytearraydeserializer

Maybe this helps.

Btw for Table API and SQL, you could also try out the fat JAR that we 
provide for the SQL Client:

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/kafka/

links to

https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.13.2/flink-sql-connector-kafka_2.11-1.13.2.jar

Here we try to do the dependency management for users.

Regards,
Timo

On 10.09.21 11:20, Harshvardhan Shinde wrote:
> I'm unable to figure out which dependency to add in order for the 
> ByteArrayDeserializer class to get included in the jar. I have added all 
> the dependencies as per the documentation still unable to figure out the 
> cause.
> 
> On Fri, Sep 10, 2021 at 12:17 AM Robert Metzger <rmetzger@apache.org 
> <ma...@apache.org>> wrote:
> 
>     Does the jar file you are trying to submit contain
>     the org/apache/kafka/common/serialization/ByteArrayDeserializer class?
> 
>     On Thu, Sep 9, 2021 at 2:10 PM Harshvardhan Shinde
>     <harshvardhan.shinde@oyorooms.com
>     <ma...@oyorooms.com>> wrote:
> 
>         Here's the complete stack trace:
> 
>         Server
>         Response:org.apache.flink.runtime.rest.handler.RestHandlerException:
>         Could not execute application. at
>         org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:108)
>         at
>         java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
>         at
>         java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
>         at
>         java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>         at
>         java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
>         at
>         java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
>         java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at
>         java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at
>         java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
>         java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748) Caused by:
>         java.util.concurrent.CompletionException:
>         java.lang.NoClassDefFoundError:
>         org/apache/kafka/common/serialization/ByteArrayDeserializer at
>         java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
>         at
>         java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
>         at
>         java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
>         ... 7 more Caused by: java.lang.NoClassDefFoundError:
>         org/apache/kafka/common/serialization/ByteArrayDeserializer at
>         org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.setDeserializer(FlinkKafkaConsumer.java:322)
>         at
>         org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:223)
>         at
>         org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:154)
>         at
>         org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createKafkaConsumer(KafkaDynamicSource.java:383)
>         at
>         org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.getScanRuntimeProvider(KafkaDynamicSource.java:205)
>         at
>         org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:453)
>         at
>         org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:161)
>         at
>         org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:119)
>         at
>         org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:85)
>         at
>         org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
>         at
>         org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
>         at
>         org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
>         at
>         org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
>         at
>         org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
>         at
>         org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
>         at
>         org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
>         at
>         org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
>         at
>         org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
>         at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>         <http://org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org>$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:169)
>         at
>         org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:161)
>         at
>         org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:989)
>         at
>         org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:958)
>         at
>         org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:283)
>         at
>         org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
>         at
>         org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704)
>         at com.harsh.test.StreamingJob.main(StreamingJob.java:106) at
>         sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
>         sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
>         sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498) at
>         org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>         at
>         org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>         at
>         org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>         at
>         org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
>         at
>         org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
>         at
>         org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102)
>         at
>         java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>         ... 7 more
> 
>         On Thu, Sep 9, 2021 at 5:36 PM Robert Metzger
>         <rmetzger@apache.org <ma...@apache.org>> wrote:
> 
>             Can you share the full stack trace, not just a part of it?
> 
>             On Thu, Sep 9, 2021 at 1:43 PM Harshvardhan Shinde
>             <harshvardhan.shinde@oyorooms.com
>             <ma...@oyorooms.com>> wrote:
> 
>                 Hi,
> 
>                 I added the dependencies while trying to resolve the
>                 same issue, thought I was missing them.
> 
>                 Thanks
> 
>                 On Thu, Sep 9, 2021 at 4:26 PM Robert Metzger
>                 <rmetzger@apache.org <ma...@apache.org>> wrote:
> 
>                     Hey,
> 
>                     Why do you have these dependencies in your pom?
> 
>                     <!--
>                     https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
>                     <https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients>
>                     -->
>                     <dependency>
>                     <groupId>org.apache.kafka</groupId>
>                     <artifactId>kafka-clients</artifactId>
>                     <version>2.8.0</version>
>                     </dependency>
> 
>                     <dependency>
>                     <groupId>org.apache.kafka</groupId>
>                     <artifactId>kafka_2.12</artifactId>
>                     <version>2.8.0</version>
>                     </dependency>
> 
> 
>                     They are not needed for using the Kafka connector of
>                     Flink (the flink kafka connector dependencies pulls
>                     the required dependencies)
> 
> 
>                     On Thu, Sep 9, 2021 at 12:02 PM Harshvardhan Shinde
>                     <harshvardhan.shinde@oyorooms.com
>                     <ma...@oyorooms.com>> wrote:
> 
>                         Hi,
> 
>                         I'm trying a simple flink job that reads data
>                         from a kafka topic and creates a Hive table.
> 
>                         I'm following the steps from here
>                         <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/hive/overview/#connecting-to-hive>.
> 
>                         Here's my code:
> 
>                         import org.apache.flink.table.api.EnvironmentSettings;
>                         import org.apache.flink.table.api.Table;
>                         import org.apache.flink.table.api.TableEnvironment;
>                         import org.apache.flink.table.catalog.hive.HiveCatalog;
> 
>                         EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>                         TableEnvironment tableEnv = TableEnvironment.create(settings);
> 
>                         String name            ="myhive";
>                         String defaultDatabase ="harsh_test";
>                         String hiveConfDir     ="/etc/hive/conf";
> 
>                         HiveCatalog hive =new HiveCatalog(name, defaultDatabase, hiveConfDir);
>                         tableEnv.registerCatalog(name, hive);
> 
>                         // set the HiveCatalog as the current catalog of
>                         the session
>                         tableEnv.useCatalog(name);
> 
>                         tableEnv.executeSql("CREATE TABLE IF NOT EXISTS transactions (\n" +
>                                " `created_at` TIMESTAMP(3) METADATA FROM
>                         'timestamp',\n" +
>                                " `partition` BIGINT METADATA VIRTUAL,\n" +
>                                " `offset` BIGINT METADATA VIRTUAL,\n" +
>                                " account_id BIGINT,\n" +
>                                " amount BIGINT,\n" +
>                                " transaction_time TIMESTAMP(3),\n" +
>                                " WATERMARK FOR transaction_time AS
>                         transaction_time - INTERVAL '5' SECOND\n" +
>                                ") WITH (\n" +
>                                " 'connector' = 'kafka',\n" +
>                                " 'topic' = 'flink-stream-table',\n" +
>                                " 'properties.bootstrap.servers' =
>                         '<BROKER_ADDRESS>:9092',\n" +
>                                " 'scan.startup.mode' = 'earliest-offset',\n" +
>                                " 'format' = 'csv'\n" +
>                                ")");
> 
>                         Table table = tableEnv.sqlQuery("Select * from transactions");
>                         table.execute().print();
> 
>                         The code builds successfully, but I'm getting
>                         the following runtime error:
> 
>                         Caused by:
>                         java.util.concurrent.CompletionException:
>                         java.lang.NoClassDefFoundError:
>                         org/apache/kafka/common/serialization/ByteArrayDeserializer
>                         at
>                         java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
>                         at
>                         java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
>                         at
>                         java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
>                         ..
> 
>                         Here are my pom.xml file contents:
> 
>                         <!--
>                         Licensed to the Apache Software Foundation (ASF)
>                         under one
>                         or more contributor license agreements. See the
>                         NOTICE file
>                         distributed with this work for additional
>                         information
>                         regarding copyright ownership. The ASF licenses
>                         this file
>                         to you under the Apache License, Version 2.0 (the
>                         "License"); you may not use this file except in
>                         compliance
>                         with the License. You may obtain a copy of the
>                         License at
> 
>                         http://www.apache.org/licenses/LICENSE-2.0
>                         <http://www.apache.org/licenses/LICENSE-2.0>
> 
>                         Unless required by applicable law or agreed to
>                         in writing,
>                         software distributed under the License is
>                         distributed on an
>                         "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
>                         OF ANY
>                         KIND, either express or implied. See the License
>                         for the
>                         specific language governing permissions and
>                         limitations
>                         under the License.
>                         -->
>                         <project
>                         xmlns="http://maven.apache.org/POM/4.0.0
>                         <http://maven.apache.org/POM/4.0.0>"
>                         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance
>                         <http://www.w3.org/2001/XMLSchema-instance>"
>                         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
>                         <http://maven.apache.org/POM/4.0.0>
>                         http://maven.apache.org/xsd/maven-4.0.0.xsd
>                         <http://maven.apache.org/xsd/maven-4.0.0.xsd>">
>                         <modelVersion>4.0.0</modelVersion>
> 
>                         <groupId>com.harsh.test</groupId>
>                         <artifactId>harsh-flink-test</artifactId>
>                         <version>1.0-SNAPSHOT</version>
>                         <packaging>jar</packaging>
> 
>                         <name>Flink Quickstart Job</name>
>                         <url>http://www.myorganization.org  <http://www.myorganization.org></url>
> 
>                         <properties>
>                         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>                         <flink.version>1.13.2</flink.version>
>                         <java.version>1.8</java.version>
>                         <hive.version>2.3.6</hive.version>
>                         <scala.binary.version>2.12</scala.binary.version>
>                         <maven.compiler.source>${java.version}</maven.compiler.source>
>                         <maven.compiler.target>${java.version}</maven.compiler.target>
>                         </properties>
> 
>                         <repositories>
>                         <repository>
>                         <id>apache.snapshots</id>
>                         <name>Apache Development Snapshot Repository</name>
>                         <url>https://repository.apache.org/content/repositories/snapshots/  <https://repository.apache.org/content/repositories/snapshots/></url>
>                         <releases>
>                         <enabled>false</enabled>
>                         </releases>
>                         <snapshots>
>                         <enabled>true</enabled>
>                         </snapshots>
>                         </repository>
>                         </repositories>
> 
>                         <dependencies>
>                         <!-- Apache Flink dependencies -->
>                         <!-- These dependencies are provided, because
>                         they should not be packaged into the JAR file. -->
>                         <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-java</artifactId>
>                         <version>${flink.version}</version>
>                         </dependency>
>                         <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
>                         <version>${flink.version}</version>
>                         </dependency>
> 
>                         <!-- Add connector dependencies here. They must
>                         be in the default scope (compile). -->
> 
>                         <!-- Example:
> 
>                         <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
>                         <version>${flink.version}</version>
>                         </dependency>
>                         -->
> 
>                         <!--
>                         https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka
>                         <https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka>
>                         -->
>                         <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
>                         <version>${flink.version}</version>
>                         </dependency>
> 
>                         <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
>                         <version>${flink.version}</version>
>                         </dependency>
> 
>                         <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
>                         <version>${flink.version}</version>
>                         </dependency>
> 
>                         <!--
>                         https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner
>                         <https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner>
>                         -->
>                         <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-table-planner_2.12</artifactId>
>                         <version>1.13.2</version>
>                         </dependency>
> 
> 
>                         <!-- Flink Dependency -->
>                         <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
>                         <version>${flink.version}</version>
>                         </dependency>
> 
>                         <!-- Hive Dependency -->
>                         <dependency>
>                         <groupId>org.apache.hive</groupId>
>                         <artifactId>hive-exec</artifactId>
>                         <version>${hive.version}</version>
>                         </dependency>
> 
>                         <!--
>                         https://mvnrepository.com/artifact/javax.servlet/javax.servlet-api
>                         <https://mvnrepository.com/artifact/javax.servlet/javax.servlet-api>
>                         -->
>                         <dependency>
>                         <groupId>javax.servlet</groupId>
>                         <artifactId>javax.servlet-api</artifactId>
>                         <version>3.1.0</version>
>                         <scope>provided</scope>
>                         </dependency>
> 
>                         <!--
>                         https://mvnrepository.com/artifact/org.apache.htrace/htrace-core4
>                         <https://mvnrepository.com/artifact/org.apache.htrace/htrace-core4>
>                         -->
>                         <dependency>
>                         <groupId>org.apache.htrace</groupId>
>                         <artifactId>htrace-core4</artifactId>
>                         <version>4.0.1-incubating</version>
>                         </dependency>
> 
>                         <!--
>                         https://mvnrepository.com/artifact/commons-configuration/commons-configuration
>                         <https://mvnrepository.com/artifact/commons-configuration/commons-configuration>
>                         -->
>                         <dependency>
>                         <groupId>commons-configuration</groupId>
>                         <artifactId>commons-configuration</artifactId>
>                         <version>1.10</version>
>                         </dependency>
> 
>                         <!--
>                         https://mvnrepository.com/artifact/commons-logging/commons-logging
>                         <https://mvnrepository.com/artifact/commons-logging/commons-logging>
>                         -->
>                         <dependency>
>                         <groupId>commons-logging</groupId>
>                         <artifactId>commons-logging</artifactId>
>                         <version>1.2</version>
>                         </dependency>
> 
>                         <!--
>                         https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2
>                         <https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2>
>                         -->
>                         <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-shaded-hadoop-2</artifactId>
>                         <version>2.8.3-10.0</version>
>                         </dependency>
> 
> 
>                         <!--
>                         https://mvnrepository.com/artifact/org.apache.flink/flink-hadoop-compatibility
>                         <https://mvnrepository.com/artifact/org.apache.flink/flink-hadoop-compatibility>
>                         -->
>                         <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-hadoop-compatibility_2.12</artifactId>
>                         <version>1.13.2</version>
>                         </dependency>
> 
>                         <!--
>                         https://mvnrepository.com/artifact/org.apache.flink/flink-hadoop-fs
>                         <https://mvnrepository.com/artifact/org.apache.flink/flink-hadoop-fs>
>                         -->
>                         <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-hadoop-fs</artifactId>
>                         <version>1.13.2</version>
>                         </dependency>
> 
>                         <!--
>                         https://mvnrepository.com/artifact/org.apache.flink/flink-csv
>                         <https://mvnrepository.com/artifact/org.apache.flink/flink-csv>
>                         -->
>                         <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-csv</artifactId>
>                         <version>1.13.2</version>
>                         </dependency>
> 
>                         <!--
>                         https://mvnrepository.com/artifact/org.apache.flink/flink-json
>                         <https://mvnrepository.com/artifact/org.apache.flink/flink-json>
>                         -->
>                         <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-json</artifactId>
>                         <version>1.13.2</version>
>                         </dependency>
> 
>                         <!--
>                         https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-hive-1.2.2
>                         <https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-hive-1.2.2>
>                         -->
>                         <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-sql-connector-hive-2.3.6_2.12</artifactId>
>                         <version>1.13.2</version>
>                         </dependency>
> 
>                         <!--
>                         https://mvnrepository.com/artifact/org.apache.flink/flink-core
>                         <https://mvnrepository.com/artifact/org.apache.flink/flink-core>
>                         -->
>                         <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-core</artifactId>
>                         <version>1.13.2</version>
>                         </dependency>
> 
>                         <!--
>                         https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
>                         <https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients>
>                         -->
>                         <dependency>
>                         <groupId>org.apache.kafka</groupId>
>                         <artifactId>kafka-clients</artifactId>
>                         <version>2.8.0</version>
>                         </dependency>
> 
>                         <dependency>
>                         <groupId>org.apache.kafka</groupId>
>                         <artifactId>kafka_2.12</artifactId>
>                         <version>2.8.0</version>
>                         </dependency>
> 
> 
>                         <!-- Add logging framework, to produce console
>                         output when running in the IDE. -->
>                         <!-- These dependencies are excluded from the
>                         application JAR by default. -->
>                         <dependency>
>                         <groupId>org.slf4j</groupId>
>                         <artifactId>slf4j-log4j12</artifactId>
>                         <version>1.7.7</version>
>                         <scope>runtime</scope>
>                         </dependency>
>                         <dependency>
>                         <groupId>log4j</groupId>
>                         <artifactId>log4j</artifactId>
>                         <version>1.2.17</version>
>                         <scope>runtime</scope>
>                         </dependency>
>                         </dependencies>
> 
>                         <build>
>                         <plugins>
> 
>                         <!-- Java Compiler -->
>                         <plugin>
>                         <groupId>org.apache.maven.plugins</groupId>
>                         <artifactId>maven-compiler-plugin</artifactId>
>                         <version>3.1</version>
>                         <configuration>
>                         <source>${java.version}</source>
>                         <target>${java.version}</target>
>                         </configuration>
>                         </plugin>
> 
>                         <!-- We use the maven-shade plugin to create a
>                         fat jar that contains all necessary
>                         dependencies. -->
>                         <!-- Change the value of
>                         <mainClass>...</mainClass> if your program entry
>                         point changes. -->
>                         <plugin>
>                         <groupId>org.apache.maven.plugins</groupId>
>                         <artifactId>maven-shade-plugin</artifactId>
>                         <version>3.0.0</version>
>                         <executions>
>                         <!-- Run shade goal on package phase -->
>                         <execution>
>                         <phase>package</phase>
>                         <goals>
>                         <goal>shade</goal>
>                         </goals>
>                         <configuration>
>                         <artifactSet>
>                         <excludes>
>                         <exclude>org.apache.flink:force-shading</exclude>
>                         <exclude>com.google.code.findbugs:jsr305</exclude>
>                         <exclude>org.slf4j:*</exclude>
>                         <exclude>log4j:*</exclude>
>                         </excludes>
>                         </artifactSet>
>                         <filters>
>                         <filter>
>                         <!-- Do not copy the signatures in the META-INF
>                         folder.
>                         Otherwise, this might cause SecurityExceptions
>                         when using the JAR. -->
>                         <artifact>*:*</artifact>
>                         <excludes>
>                         <exclude>META-INF/*.SF</exclude>
>                         <exclude>META-INF/*.DSA</exclude>
>                         <exclude>META-INF/*.RSA</exclude>
>                         </excludes>
>                         </filter>
>                         </filters>
>                         <transformers>
>                         <transformer
>                         implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
>                         <mainClass>com.harsh.test.StreamingJob</mainClass>
>                         </transformer>
>                         </transformers>
>                         </configuration>
>                         </execution>
>                         </executions>
>                         </plugin>
>                         </plugins>
> 
>                         <pluginManagement>
>                         <plugins>
> 
>                         <!-- This improves the out-of-the-box experience
>                         in Eclipse by resolving some warnings. -->
>                         <plugin>
>                         <groupId>org.eclipse.m2e</groupId>
>                         <artifactId>lifecycle-mapping</artifactId>
>                         <version>1.0.0</version>
>                         <configuration>
>                         <lifecycleMappingMetadata>
>                         <pluginExecutions>
>                         <pluginExecution>
>                         <pluginExecutionFilter>
>                         <groupId>org.apache.maven.plugins</groupId>
>                         <artifactId>maven-shade-plugin</artifactId>
>                         <versionRange>[3.0.0,)</versionRange>
>                         <goals>
>                         <goal>shade</goal>
>                         </goals>
>                         </pluginExecutionFilter>
>                         <action>
>                         <ignore/>
>                         </action>
>                         </pluginExecution>
>                         <pluginExecution>
>                         <pluginExecutionFilter>
>                         <groupId>org.apache.maven.plugins</groupId>
>                         <artifactId>maven-compiler-plugin</artifactId>
>                         <versionRange>[3.1,)</versionRange>
>                         <goals>
>                         <goal>testCompile</goal>
>                         <goal>compile</goal>
>                         </goals>
>                         </pluginExecutionFilter>
>                         <action>
>                         <ignore/>
>                         </action>
>                         </pluginExecution>
>                         </pluginExecutions>
>                         </lifecycleMappingMetadata>
>                         </configuration>
>                         </plugin>
>                         </plugins>
>                         </pluginManagement>
>                         </build>
> 
>                         <!-- This profile helps to make things run out
>                         of the box in IntelliJ -->
>                         <!-- Its adds Flink's core classes to the
>                         runtime class path. -->
>                         <!-- Otherwise they are missing in IntelliJ,
>                         because the dependency is 'provided' -->
>                         <profiles>
>                         <profile>
>                         <id>add-dependencies-for-IDEA</id>
> 
>                         <activation>
>                         <property>
>                         <name>idea.version</name>
>                         </property>
>                         </activation>
> 
>                         <dependencies>
>                         <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-java</artifactId>
>                         <version>${flink.version}</version>
>                         <scope>compile</scope>
>                         </dependency>
>                         <dependency>
>                         <groupId>org.apache.flink</groupId>
>                         <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
>                         <version>${flink.version}</version>
>                         <scope>compile</scope>
>                         </dependency>
>                         </dependencies>
>                         </profile>
>                         </profiles>
> 
>                         </project>
> 
> 
>                         Please help me resolve the issue.
> 
>                         Thanks
> 
> 
> 
> 
> 
>                 -- 
>                 Thanks and Regards,
>                 Harshvardhan
>                 Data Platform
> 
> 
> 
>         -- 
>         Thanks and Regards,
>         Harshvardhan
>         Data Platform
> 
> 
> 
> -- 
> Thanks and Regards,
> Harshvardhan
> Data Platform


Re: Issue while creating Hive table from Kafka topic

Posted by Harshvardhan Shinde <ha...@oyorooms.com>.
I'm unable to figure out which dependency to add in order for the
ByteArrayDeserializer class to get included in the jar. I have added all
the dependencies as per the documentation still unable to figure out the
cause.

On Fri, Sep 10, 2021 at 12:17 AM Robert Metzger <rm...@apache.org> wrote:

> Does the jar file you are trying to submit contain
> the org/apache/kafka/common/serialization/ByteArrayDeserializer class?
>
> On Thu, Sep 9, 2021 at 2:10 PM Harshvardhan Shinde <
> harshvardhan.shinde@oyorooms.com> wrote:
>
>> Here's the complete stack trace:
>>
>> Server Response:org.apache.flink.runtime.rest.handler.RestHandlerException:
>> Could not execute application. at
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:108)
>> at
>> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
>> at
>> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
>> at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>> at
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>> at
>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748) Caused by:
>> java.util.concurrent.CompletionException: java.lang.NoClassDefFoundError:
>> org/apache/kafka/common/serialization/ByteArrayDeserializer at
>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
>> at
>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
>> at
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
>> ... 7 more Caused by: java.lang.NoClassDefFoundError:
>> org/apache/kafka/common/serialization/ByteArrayDeserializer at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.setDeserializer(FlinkKafkaConsumer.java:322)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:223)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:154)
>> at
>> org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createKafkaConsumer(KafkaDynamicSource.java:383)
>> at
>> org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.getScanRuntimeProvider(KafkaDynamicSource.java:205)
>> at
>> org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:453)
>> at
>> org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:161)
>> at
>> org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:119)
>> at
>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:85)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
>> at
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:169)
>> at
>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:161)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:989)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:958)
>> at
>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:283)
>> at
>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704)
>> at com.harsh.test.StreamingJob.main(StreamingJob.java:106) at
>> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498) at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>> at
>> org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
>> at
>> org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
>> at
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102)
>> at
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
>> ... 7 more
>>
>> On Thu, Sep 9, 2021 at 5:36 PM Robert Metzger <rm...@apache.org>
>> wrote:
>>
>>> Can you share the full stack trace, not just a part of it?
>>>
>>> On Thu, Sep 9, 2021 at 1:43 PM Harshvardhan Shinde <
>>> harshvardhan.shinde@oyorooms.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I added the dependencies while trying to resolve the same
>>>> issue, thought I was missing them.
>>>>
>>>> Thanks
>>>>
>>>> On Thu, Sep 9, 2021 at 4:26 PM Robert Metzger <rm...@apache.org>
>>>> wrote:
>>>>
>>>>> Hey,
>>>>>
>>>>> Why do you have these dependencies in your pom?
>>>>>
>>>>>         <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
>>>>>         <dependency>
>>>>>             <groupId>org.apache.kafka</groupId>
>>>>>             <artifactId>kafka-clients</artifactId>
>>>>>             <version>2.8.0</version>
>>>>>         </dependency>
>>>>>
>>>>>         <dependency>
>>>>>             <groupId>org.apache.kafka</groupId>
>>>>>             <artifactId>kafka_2.12</artifactId>
>>>>>             <version>2.8.0</version>
>>>>>         </dependency>
>>>>>
>>>>>
>>>>> They are not needed for using the Kafka connector of Flink (the flink
>>>>> kafka connector dependencies pulls the required dependencies)
>>>>>
>>>>>
>>>>> On Thu, Sep 9, 2021 at 12:02 PM Harshvardhan Shinde <
>>>>> harshvardhan.shinde@oyorooms.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I'm trying a simple flink job that reads data from a kafka topic and
>>>>>> creates a Hive table.
>>>>>>
>>>>>> I'm following the steps from here
>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/hive/overview/#connecting-to-hive>
>>>>>> .
>>>>>>
>>>>>> Here's my code:
>>>>>>
>>>>>> import org.apache.flink.table.api.EnvironmentSettings;
>>>>>> import org.apache.flink.table.api.Table;
>>>>>> import org.apache.flink.table.api.TableEnvironment;
>>>>>> import org.apache.flink.table.catalog.hive.HiveCatalog;
>>>>>>
>>>>>> EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>>>>> TableEnvironment tableEnv = TableEnvironment.create(settings);
>>>>>>
>>>>>> String name            = "myhive";
>>>>>> String defaultDatabase = "harsh_test";
>>>>>> String hiveConfDir     = "/etc/hive/conf";
>>>>>>
>>>>>> HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
>>>>>> tableEnv.registerCatalog(name, hive);
>>>>>>
>>>>>> // set the HiveCatalog as the current catalog of the session
>>>>>> tableEnv.useCatalog(name);
>>>>>>
>>>>>> tableEnv.executeSql("CREATE TABLE IF NOT EXISTS transactions (\n" +
>>>>>>       "  `created_at` TIMESTAMP(3) METADATA FROM 'timestamp',\n" +
>>>>>>       "      `partition` BIGINT METADATA VIRTUAL,\n" +
>>>>>>       "      `offset` BIGINT METADATA VIRTUAL,\n" +
>>>>>>       "    account_id  BIGINT,\n" +
>>>>>>       "    amount      BIGINT,\n" +
>>>>>>       "    transaction_time TIMESTAMP(3),\n" +
>>>>>>       "    WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +
>>>>>>       ") WITH (\n" +
>>>>>>       "    'connector' = 'kafka',\n" +
>>>>>>       "    'topic'     = 'flink-stream-table',\n" +
>>>>>>       "    'properties.bootstrap.servers' = '<BROKER_ADDRESS>:9092',\n" +
>>>>>>       "   'scan.startup.mode' = 'earliest-offset',\n" +
>>>>>>       "    'format'    = 'csv'\n" +
>>>>>>       ")");
>>>>>>
>>>>>> Table table = tableEnv.sqlQuery("Select * from transactions");
>>>>>> table.execute().print();
>>>>>>
>>>>>> The code builds successfully, but I'm getting the following runtime
>>>>>> error:
>>>>>>
>>>>>> Caused by: java.util.concurrent.CompletionException:
>>>>>> java.lang.NoClassDefFoundError:
>>>>>> org/apache/kafka/common/serialization/ByteArrayDeserializer at
>>>>>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
>>>>>> ..
>>>>>>
>>>>>> Here are my pom.xml file contents:
>>>>>>
>>>>>> <!--
>>>>>> Licensed to the Apache Software Foundation (ASF) under one
>>>>>> or more contributor license agreements.  See the NOTICE file
>>>>>> distributed with this work for additional information
>>>>>> regarding copyright ownership.  The ASF licenses this file
>>>>>> to you under the Apache License, Version 2.0 (the
>>>>>> "License"); you may not use this file except in compliance
>>>>>> with the License.  You may obtain a copy of the License at
>>>>>>
>>>>>>   http://www.apache.org/licenses/LICENSE-2.0
>>>>>>
>>>>>> Unless required by applicable law or agreed to in writing,
>>>>>> software distributed under the License is distributed on an
>>>>>> "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>>>>>> KIND, either express or implied.  See the License for the
>>>>>> specific language governing permissions and limitations
>>>>>> under the License.
>>>>>> -->
>>>>>> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>>>>>>          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
>>>>>>     <modelVersion>4.0.0</modelVersion>
>>>>>>
>>>>>>     <groupId>com.harsh.test</groupId>
>>>>>>     <artifactId>harsh-flink-test</artifactId>
>>>>>>     <version>1.0-SNAPSHOT</version>
>>>>>>     <packaging>jar</packaging>
>>>>>>
>>>>>>     <name>Flink Quickstart Job</name>
>>>>>>     <url>http://www.myorganization.org</url>
>>>>>>
>>>>>>     <properties>
>>>>>>         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>>>>>>         <flink.version>1.13.2</flink.version>
>>>>>>         <java.version>1.8</java.version>
>>>>>>         <hive.version>2.3.6</hive.version>
>>>>>>         <scala.binary.version>2.12</scala.binary.version>
>>>>>>         <maven.compiler.source>${java.version}</maven.compiler.source>
>>>>>>         <maven.compiler.target>${java.version}</maven.compiler.target>
>>>>>>     </properties>
>>>>>>
>>>>>>     <repositories>
>>>>>>         <repository>
>>>>>>             <id>apache.snapshots</id>
>>>>>>             <name>Apache Development Snapshot Repository</name>
>>>>>>             <url>https://repository.apache.org/content/repositories/snapshots/</url>
>>>>>>             <releases>
>>>>>>                 <enabled>false</enabled>
>>>>>>             </releases>
>>>>>>             <snapshots>
>>>>>>                 <enabled>true</enabled>
>>>>>>             </snapshots>
>>>>>>         </repository>
>>>>>>     </repositories>
>>>>>>
>>>>>>     <dependencies>
>>>>>>         <!-- Apache Flink dependencies -->
>>>>>>         <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
>>>>>>         <dependency>
>>>>>>             <groupId>org.apache.flink</groupId>
>>>>>>             <artifactId>flink-java</artifactId>
>>>>>>             <version>${flink.version}</version>
>>>>>>         </dependency>
>>>>>>         <dependency>
>>>>>>             <groupId>org.apache.flink</groupId>
>>>>>>             <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
>>>>>>             <version>${flink.version}</version>
>>>>>>         </dependency>
>>>>>>
>>>>>>         <!-- Add connector dependencies here. They must be in the default scope (compile). -->
>>>>>>
>>>>>>         <!-- Example:
>>>>>>
>>>>>>         <dependency>
>>>>>>             <groupId>org.apache.flink</groupId>
>>>>>>             <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
>>>>>>             <version>${flink.version}</version>
>>>>>>         </dependency>
>>>>>>         -->
>>>>>>
>>>>>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
>>>>>>         <dependency>
>>>>>>             <groupId>org.apache.flink</groupId>
>>>>>>             <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
>>>>>>             <version>${flink.version}</version>
>>>>>>         </dependency>
>>>>>>
>>>>>>         <dependency>
>>>>>>             <groupId>org.apache.flink</groupId>
>>>>>>             <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
>>>>>>             <version>${flink.version}</version>
>>>>>>         </dependency>
>>>>>>
>>>>>>         <dependency>
>>>>>>             <groupId>org.apache.flink</groupId>
>>>>>>             <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
>>>>>>             <version>${flink.version}</version>
>>>>>>         </dependency>
>>>>>>
>>>>>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
>>>>>>         <dependency>
>>>>>>             <groupId>org.apache.flink</groupId>
>>>>>>             <artifactId>flink-table-planner_2.12</artifactId>
>>>>>>             <version>1.13.2</version>
>>>>>>         </dependency>
>>>>>>
>>>>>>
>>>>>>         <!-- Flink Dependency -->
>>>>>>         <dependency>
>>>>>>             <groupId>org.apache.flink</groupId>
>>>>>>             <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
>>>>>>             <version>${flink.version}</version>
>>>>>>         </dependency>
>>>>>>
>>>>>>         <!-- Hive Dependency -->
>>>>>>         <dependency>
>>>>>>             <groupId>org.apache.hive</groupId>
>>>>>>             <artifactId>hive-exec</artifactId>
>>>>>>             <version>${hive.version}</version>
>>>>>>         </dependency>
>>>>>>
>>>>>>         <!-- https://mvnrepository.com/artifact/javax.servlet/javax.servlet-api -->
>>>>>>         <dependency>
>>>>>>             <groupId>javax.servlet</groupId>
>>>>>>             <artifactId>javax.servlet-api</artifactId>
>>>>>>             <version>3.1.0</version>
>>>>>>             <scope>provided</scope>
>>>>>>         </dependency>
>>>>>>
>>>>>>         <!-- https://mvnrepository.com/artifact/org.apache.htrace/htrace-core4 -->
>>>>>>         <dependency>
>>>>>>             <groupId>org.apache.htrace</groupId>
>>>>>>             <artifactId>htrace-core4</artifactId>
>>>>>>             <version>4.0.1-incubating</version>
>>>>>>         </dependency>
>>>>>>
>>>>>>         <!-- https://mvnrepository.com/artifact/commons-configuration/commons-configuration -->
>>>>>>         <dependency>
>>>>>>             <groupId>commons-configuration</groupId>
>>>>>>             <artifactId>commons-configuration</artifactId>
>>>>>>             <version>1.10</version>
>>>>>>         </dependency>
>>>>>>
>>>>>>         <!-- https://mvnrepository.com/artifact/commons-logging/commons-logging -->
>>>>>>         <dependency>
>>>>>>             <groupId>commons-logging</groupId>
>>>>>>             <artifactId>commons-logging</artifactId>
>>>>>>             <version>1.2</version>
>>>>>>         </dependency>
>>>>>>
>>>>>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2 -->
>>>>>>         <dependency>
>>>>>>             <groupId>org.apache.flink</groupId>
>>>>>>             <artifactId>flink-shaded-hadoop-2</artifactId>
>>>>>>             <version>2.8.3-10.0</version>
>>>>>>         </dependency>
>>>>>>
>>>>>>
>>>>>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-hadoop-compatibility -->
>>>>>>         <dependency>
>>>>>>             <groupId>org.apache.flink</groupId>
>>>>>>             <artifactId>flink-hadoop-compatibility_2.12</artifactId>
>>>>>>             <version>1.13.2</version>
>>>>>>         </dependency>
>>>>>>
>>>>>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-hadoop-fs -->
>>>>>>         <dependency>
>>>>>>             <groupId>org.apache.flink</groupId>
>>>>>>             <artifactId>flink-hadoop-fs</artifactId>
>>>>>>             <version>1.13.2</version>
>>>>>>         </dependency>
>>>>>>
>>>>>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-csv -->
>>>>>>         <dependency>
>>>>>>             <groupId>org.apache.flink</groupId>
>>>>>>             <artifactId>flink-csv</artifactId>
>>>>>>             <version>1.13.2</version>
>>>>>>         </dependency>
>>>>>>
>>>>>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-json -->
>>>>>>         <dependency>
>>>>>>             <groupId>org.apache.flink</groupId>
>>>>>>             <artifactId>flink-json</artifactId>
>>>>>>             <version>1.13.2</version>
>>>>>>         </dependency>
>>>>>>
>>>>>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-hive-1.2.2 -->
>>>>>>         <dependency>
>>>>>>             <groupId>org.apache.flink</groupId>
>>>>>>             <artifactId>flink-sql-connector-hive-2.3.6_2.12</artifactId>
>>>>>>             <version>1.13.2</version>
>>>>>>         </dependency>
>>>>>>
>>>>>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
>>>>>>         <dependency>
>>>>>>             <groupId>org.apache.flink</groupId>
>>>>>>             <artifactId>flink-core</artifactId>
>>>>>>             <version>1.13.2</version>
>>>>>>         </dependency>
>>>>>>
>>>>>>         <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
>>>>>>         <dependency>
>>>>>>             <groupId>org.apache.kafka</groupId>
>>>>>>             <artifactId>kafka-clients</artifactId>
>>>>>>             <version>2.8.0</version>
>>>>>>         </dependency>
>>>>>>
>>>>>>         <dependency>
>>>>>>             <groupId>org.apache.kafka</groupId>
>>>>>>             <artifactId>kafka_2.12</artifactId>
>>>>>>             <version>2.8.0</version>
>>>>>>         </dependency>
>>>>>>
>>>>>>
>>>>>>         <!-- Add logging framework, to produce console output when running in the IDE. -->
>>>>>>         <!-- These dependencies are excluded from the application JAR by default. -->
>>>>>>         <dependency>
>>>>>>             <groupId>org.slf4j</groupId>
>>>>>>             <artifactId>slf4j-log4j12</artifactId>
>>>>>>             <version>1.7.7</version>
>>>>>>             <scope>runtime</scope>
>>>>>>         </dependency>
>>>>>>         <dependency>
>>>>>>             <groupId>log4j</groupId>
>>>>>>             <artifactId>log4j</artifactId>
>>>>>>             <version>1.2.17</version>
>>>>>>             <scope>runtime</scope>
>>>>>>         </dependency>
>>>>>>     </dependencies>
>>>>>>
>>>>>>     <build>
>>>>>>         <plugins>
>>>>>>
>>>>>>             <!-- Java Compiler -->
>>>>>>             <plugin>
>>>>>>                 <groupId>org.apache.maven.plugins</groupId>
>>>>>>                 <artifactId>maven-compiler-plugin</artifactId>
>>>>>>                 <version>3.1</version>
>>>>>>                 <configuration>
>>>>>>                     <source>${java.version}</source>
>>>>>>                     <target>${java.version}</target>
>>>>>>                 </configuration>
>>>>>>             </plugin>
>>>>>>
>>>>>>             <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
>>>>>>             <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
>>>>>>             <plugin>
>>>>>>                 <groupId>org.apache.maven.plugins</groupId>
>>>>>>                 <artifactId>maven-shade-plugin</artifactId>
>>>>>>                 <version>3.0.0</version>
>>>>>>                 <executions>
>>>>>>                     <!-- Run shade goal on package phase -->
>>>>>>                     <execution>
>>>>>>                         <phase>package</phase>
>>>>>>                         <goals>
>>>>>>                             <goal>shade</goal>
>>>>>>                         </goals>
>>>>>>                         <configuration>
>>>>>>                             <artifactSet>
>>>>>>                                 <excludes>
>>>>>>                                     <exclude>org.apache.flink:force-shading</exclude>
>>>>>>                                     <exclude>com.google.code.findbugs:jsr305</exclude>
>>>>>>                                     <exclude>org.slf4j:*</exclude>
>>>>>>                                     <exclude>log4j:*</exclude>
>>>>>>                                 </excludes>
>>>>>>                             </artifactSet>
>>>>>>                             <filters>
>>>>>>                                 <filter>
>>>>>>                                     <!-- Do not copy the signatures in the META-INF folder.
>>>>>>                                     Otherwise, this might cause SecurityExceptions when using the JAR. -->
>>>>>>                                     <artifact>*:*</artifact>
>>>>>>                                     <excludes>
>>>>>>                                         <exclude>META-INF/*.SF</exclude>
>>>>>>                                         <exclude>META-INF/*.DSA</exclude>
>>>>>>                                         <exclude>META-INF/*.RSA</exclude>
>>>>>>                                     </excludes>
>>>>>>                                 </filter>
>>>>>>                             </filters>
>>>>>>                             <transformers>
>>>>>>                                 <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
>>>>>>                                     <mainClass>com.harsh.test.StreamingJob</mainClass>
>>>>>>                                 </transformer>
>>>>>>                             </transformers>
>>>>>>                         </configuration>
>>>>>>                     </execution>
>>>>>>                 </executions>
>>>>>>             </plugin>
>>>>>>         </plugins>
>>>>>>
>>>>>>         <pluginManagement>
>>>>>>             <plugins>
>>>>>>
>>>>>>                 <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
>>>>>>                 <plugin>
>>>>>>                     <groupId>org.eclipse.m2e</groupId>
>>>>>>                     <artifactId>lifecycle-mapping</artifactId>
>>>>>>                     <version>1.0.0</version>
>>>>>>                     <configuration>
>>>>>>                         <lifecycleMappingMetadata>
>>>>>>                             <pluginExecutions>
>>>>>>                                 <pluginExecution>
>>>>>>                                     <pluginExecutionFilter>
>>>>>>                                         <groupId>org.apache.maven.plugins</groupId>
>>>>>>                                         <artifactId>maven-shade-plugin</artifactId>
>>>>>>                                         <versionRange>[3.0.0,)</versionRange>
>>>>>>                                         <goals>
>>>>>>                                             <goal>shade</goal>
>>>>>>                                         </goals>
>>>>>>                                     </pluginExecutionFilter>
>>>>>>                                     <action>
>>>>>>                                         <ignore/>
>>>>>>                                     </action>
>>>>>>                                 </pluginExecution>
>>>>>>                                 <pluginExecution>
>>>>>>                                     <pluginExecutionFilter>
>>>>>>                                         <groupId>org.apache.maven.plugins</groupId>
>>>>>>                                         <artifactId>maven-compiler-plugin</artifactId>
>>>>>>                                         <versionRange>[3.1,)</versionRange>
>>>>>>                                         <goals>
>>>>>>                                             <goal>testCompile</goal>
>>>>>>                                             <goal>compile</goal>
>>>>>>                                         </goals>
>>>>>>                                     </pluginExecutionFilter>
>>>>>>                                     <action>
>>>>>>                                         <ignore/>
>>>>>>                                     </action>
>>>>>>                                 </pluginExecution>
>>>>>>                             </pluginExecutions>
>>>>>>                         </lifecycleMappingMetadata>
>>>>>>                     </configuration>
>>>>>>                 </plugin>
>>>>>>             </plugins>
>>>>>>         </pluginManagement>
>>>>>>     </build>
>>>>>>
>>>>>>     <!-- This profile helps to make things run out of the box in IntelliJ -->
>>>>>>     <!-- Its adds Flink's core classes to the runtime class path. -->
>>>>>>     <!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
>>>>>>     <profiles>
>>>>>>         <profile>
>>>>>>             <id>add-dependencies-for-IDEA</id>
>>>>>>
>>>>>>             <activation>
>>>>>>                 <property>
>>>>>>                     <name>idea.version</name>
>>>>>>                 </property>
>>>>>>             </activation>
>>>>>>
>>>>>>             <dependencies>
>>>>>>                 <dependency>
>>>>>>                     <groupId>org.apache.flink</groupId>
>>>>>>                     <artifactId>flink-java</artifactId>
>>>>>>                     <version>${flink.version}</version>
>>>>>>                     <scope>compile</scope>
>>>>>>                 </dependency>
>>>>>>                 <dependency>
>>>>>>                     <groupId>org.apache.flink</groupId>
>>>>>>                     <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
>>>>>>                     <version>${flink.version}</version>
>>>>>>                     <scope>compile</scope>
>>>>>>                 </dependency>
>>>>>>             </dependencies>
>>>>>>         </profile>
>>>>>>     </profiles>
>>>>>>
>>>>>> </project>
>>>>>>
>>>>>>
>>>>>> Please help me resolve the issue.
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>
>>>> --
>>>> Thanks and Regards,
>>>> Harshvardhan
>>>> Data Platform
>>>>
>>>
>>
>> --
>> Thanks and Regards,
>> Harshvardhan
>> Data Platform
>>
>

-- 
Thanks and Regards,
Harshvardhan
Data Platform

Re: Issue while creating Hive table from Kafka topic

Posted by Robert Metzger <rm...@apache.org>.
Does the jar file you are trying to submit contain
the org/apache/kafka/common/serialization/ByteArrayDeserializer class?

On Thu, Sep 9, 2021 at 2:10 PM Harshvardhan Shinde <
harshvardhan.shinde@oyorooms.com> wrote:

> Here's the complete stack trace:
>
> Server Response:org.apache.flink.runtime.rest.handler.RestHandlerException:
> Could not execute application. at
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:108)
> at
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
> at
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748) Caused by:
> java.util.concurrent.CompletionException: java.lang.NoClassDefFoundError:
> org/apache/kafka/common/serialization/ByteArrayDeserializer at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
> ... 7 more Caused by: java.lang.NoClassDefFoundError:
> org/apache/kafka/common/serialization/ByteArrayDeserializer at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.setDeserializer(FlinkKafkaConsumer.java:322)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:223)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:154)
> at
> org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createKafkaConsumer(KafkaDynamicSource.java:383)
> at
> org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.getScanRuntimeProvider(KafkaDynamicSource.java:205)
> at
> org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:453)
> at
> org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:161)
> at
> org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:119)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:85)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:169)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:161)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:989)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:958)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:283)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704)
> at com.harsh.test.StreamingJob.main(StreamingJob.java:106) at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498) at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
> at
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
> at
> org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
> at
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102)
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
> ... 7 more
>
> On Thu, Sep 9, 2021 at 5:36 PM Robert Metzger <rm...@apache.org> wrote:
>
>> Can you share the full stack trace, not just a part of it?
>>
>> On Thu, Sep 9, 2021 at 1:43 PM Harshvardhan Shinde <
>> harshvardhan.shinde@oyorooms.com> wrote:
>>
>>> Hi,
>>>
>>> I added the dependencies while trying to resolve the same issue, thought
>>> I was missing them.
>>>
>>> Thanks
>>>
>>> On Thu, Sep 9, 2021 at 4:26 PM Robert Metzger <rm...@apache.org>
>>> wrote:
>>>
>>>> Hey,
>>>>
>>>> Why do you have these dependencies in your pom?
>>>>
>>>>         <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
>>>>         <dependency>
>>>>             <groupId>org.apache.kafka</groupId>
>>>>             <artifactId>kafka-clients</artifactId>
>>>>             <version>2.8.0</version>
>>>>         </dependency>
>>>>
>>>>         <dependency>
>>>>             <groupId>org.apache.kafka</groupId>
>>>>             <artifactId>kafka_2.12</artifactId>
>>>>             <version>2.8.0</version>
>>>>         </dependency>
>>>>
>>>>
>>>> They are not needed for using the Kafka connector of Flink (the flink
>>>> kafka connector dependencies pulls the required dependencies)
>>>>
>>>>
>>>> On Thu, Sep 9, 2021 at 12:02 PM Harshvardhan Shinde <
>>>> harshvardhan.shinde@oyorooms.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I'm trying a simple flink job that reads data from a kafka topic and
>>>>> creates a Hive table.
>>>>>
>>>>> I'm following the steps from here
>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/hive/overview/#connecting-to-hive>
>>>>> .
>>>>>
>>>>> Here's my code:
>>>>>
>>>>> import org.apache.flink.table.api.EnvironmentSettings;
>>>>> import org.apache.flink.table.api.Table;
>>>>> import org.apache.flink.table.api.TableEnvironment;
>>>>> import org.apache.flink.table.catalog.hive.HiveCatalog;
>>>>>
>>>>> EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>>>> TableEnvironment tableEnv = TableEnvironment.create(settings);
>>>>>
>>>>> String name            = "myhive";
>>>>> String defaultDatabase = "harsh_test";
>>>>> String hiveConfDir     = "/etc/hive/conf";
>>>>>
>>>>> HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
>>>>> tableEnv.registerCatalog(name, hive);
>>>>>
>>>>> // set the HiveCatalog as the current catalog of the session
>>>>> tableEnv.useCatalog(name);
>>>>>
>>>>> tableEnv.executeSql("CREATE TABLE IF NOT EXISTS transactions (\n" +
>>>>>       "  `created_at` TIMESTAMP(3) METADATA FROM 'timestamp',\n" +
>>>>>       "      `partition` BIGINT METADATA VIRTUAL,\n" +
>>>>>       "      `offset` BIGINT METADATA VIRTUAL,\n" +
>>>>>       "    account_id  BIGINT,\n" +
>>>>>       "    amount      BIGINT,\n" +
>>>>>       "    transaction_time TIMESTAMP(3),\n" +
>>>>>       "    WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +
>>>>>       ") WITH (\n" +
>>>>>       "    'connector' = 'kafka',\n" +
>>>>>       "    'topic'     = 'flink-stream-table',\n" +
>>>>>       "    'properties.bootstrap.servers' = '<BROKER_ADDRESS>:9092',\n" +
>>>>>       "   'scan.startup.mode' = 'earliest-offset',\n" +
>>>>>       "    'format'    = 'csv'\n" +
>>>>>       ")");
>>>>>
>>>>> Table table = tableEnv.sqlQuery("Select * from transactions");
>>>>> table.execute().print();
>>>>>
>>>>> The code builds successfully, but I'm getting the following runtime
>>>>> error:
>>>>>
>>>>> Caused by: java.util.concurrent.CompletionException:
>>>>> java.lang.NoClassDefFoundError:
>>>>> org/apache/kafka/common/serialization/ByteArrayDeserializer at
>>>>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
>>>>> at
>>>>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
>>>>> at
>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
>>>>> ..
>>>>>
>>>>> Here are my pom.xml file contents:
>>>>>
>>>>> <!--
>>>>> Licensed to the Apache Software Foundation (ASF) under one
>>>>> or more contributor license agreements.  See the NOTICE file
>>>>> distributed with this work for additional information
>>>>> regarding copyright ownership.  The ASF licenses this file
>>>>> to you under the Apache License, Version 2.0 (the
>>>>> "License"); you may not use this file except in compliance
>>>>> with the License.  You may obtain a copy of the License at
>>>>>
>>>>>   http://www.apache.org/licenses/LICENSE-2.0
>>>>>
>>>>> Unless required by applicable law or agreed to in writing,
>>>>> software distributed under the License is distributed on an
>>>>> "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>>>>> KIND, either express or implied.  See the License for the
>>>>> specific language governing permissions and limitations
>>>>> under the License.
>>>>> -->
>>>>> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>>>>>          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
>>>>>     <modelVersion>4.0.0</modelVersion>
>>>>>
>>>>>     <groupId>com.harsh.test</groupId>
>>>>>     <artifactId>harsh-flink-test</artifactId>
>>>>>     <version>1.0-SNAPSHOT</version>
>>>>>     <packaging>jar</packaging>
>>>>>
>>>>>     <name>Flink Quickstart Job</name>
>>>>>     <url>http://www.myorganization.org</url>
>>>>>
>>>>>     <properties>
>>>>>         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>>>>>         <flink.version>1.13.2</flink.version>
>>>>>         <java.version>1.8</java.version>
>>>>>         <hive.version>2.3.6</hive.version>
>>>>>         <scala.binary.version>2.12</scala.binary.version>
>>>>>         <maven.compiler.source>${java.version}</maven.compiler.source>
>>>>>         <maven.compiler.target>${java.version}</maven.compiler.target>
>>>>>     </properties>
>>>>>
>>>>>     <repositories>
>>>>>         <repository>
>>>>>             <id>apache.snapshots</id>
>>>>>             <name>Apache Development Snapshot Repository</name>
>>>>>             <url>https://repository.apache.org/content/repositories/snapshots/</url>
>>>>>             <releases>
>>>>>                 <enabled>false</enabled>
>>>>>             </releases>
>>>>>             <snapshots>
>>>>>                 <enabled>true</enabled>
>>>>>             </snapshots>
>>>>>         </repository>
>>>>>     </repositories>
>>>>>
>>>>>     <dependencies>
>>>>>         <!-- Apache Flink dependencies -->
>>>>>         <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
>>>>>         <dependency>
>>>>>             <groupId>org.apache.flink</groupId>
>>>>>             <artifactId>flink-java</artifactId>
>>>>>             <version>${flink.version}</version>
>>>>>         </dependency>
>>>>>         <dependency>
>>>>>             <groupId>org.apache.flink</groupId>
>>>>>             <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
>>>>>             <version>${flink.version}</version>
>>>>>         </dependency>
>>>>>
>>>>>         <!-- Add connector dependencies here. They must be in the default scope (compile). -->
>>>>>
>>>>>         <!-- Example:
>>>>>
>>>>>         <dependency>
>>>>>             <groupId>org.apache.flink</groupId>
>>>>>             <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
>>>>>             <version>${flink.version}</version>
>>>>>         </dependency>
>>>>>         -->
>>>>>
>>>>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
>>>>>         <dependency>
>>>>>             <groupId>org.apache.flink</groupId>
>>>>>             <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
>>>>>             <version>${flink.version}</version>
>>>>>         </dependency>
>>>>>
>>>>>         <dependency>
>>>>>             <groupId>org.apache.flink</groupId>
>>>>>             <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
>>>>>             <version>${flink.version}</version>
>>>>>         </dependency>
>>>>>
>>>>>         <dependency>
>>>>>             <groupId>org.apache.flink</groupId>
>>>>>             <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
>>>>>             <version>${flink.version}</version>
>>>>>         </dependency>
>>>>>
>>>>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
>>>>>         <dependency>
>>>>>             <groupId>org.apache.flink</groupId>
>>>>>             <artifactId>flink-table-planner_2.12</artifactId>
>>>>>             <version>1.13.2</version>
>>>>>         </dependency>
>>>>>
>>>>>
>>>>>         <!-- Flink Dependency -->
>>>>>         <dependency>
>>>>>             <groupId>org.apache.flink</groupId>
>>>>>             <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
>>>>>             <version>${flink.version}</version>
>>>>>         </dependency>
>>>>>
>>>>>         <!-- Hive Dependency -->
>>>>>         <dependency>
>>>>>             <groupId>org.apache.hive</groupId>
>>>>>             <artifactId>hive-exec</artifactId>
>>>>>             <version>${hive.version}</version>
>>>>>         </dependency>
>>>>>
>>>>>         <!-- https://mvnrepository.com/artifact/javax.servlet/javax.servlet-api -->
>>>>>         <dependency>
>>>>>             <groupId>javax.servlet</groupId>
>>>>>             <artifactId>javax.servlet-api</artifactId>
>>>>>             <version>3.1.0</version>
>>>>>             <scope>provided</scope>
>>>>>         </dependency>
>>>>>
>>>>>         <!-- https://mvnrepository.com/artifact/org.apache.htrace/htrace-core4 -->
>>>>>         <dependency>
>>>>>             <groupId>org.apache.htrace</groupId>
>>>>>             <artifactId>htrace-core4</artifactId>
>>>>>             <version>4.0.1-incubating</version>
>>>>>         </dependency>
>>>>>
>>>>>         <!-- https://mvnrepository.com/artifact/commons-configuration/commons-configuration -->
>>>>>         <dependency>
>>>>>             <groupId>commons-configuration</groupId>
>>>>>             <artifactId>commons-configuration</artifactId>
>>>>>             <version>1.10</version>
>>>>>         </dependency>
>>>>>
>>>>>         <!-- https://mvnrepository.com/artifact/commons-logging/commons-logging -->
>>>>>         <dependency>
>>>>>             <groupId>commons-logging</groupId>
>>>>>             <artifactId>commons-logging</artifactId>
>>>>>             <version>1.2</version>
>>>>>         </dependency>
>>>>>
>>>>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2 -->
>>>>>         <dependency>
>>>>>             <groupId>org.apache.flink</groupId>
>>>>>             <artifactId>flink-shaded-hadoop-2</artifactId>
>>>>>             <version>2.8.3-10.0</version>
>>>>>         </dependency>
>>>>>
>>>>>
>>>>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-hadoop-compatibility -->
>>>>>         <dependency>
>>>>>             <groupId>org.apache.flink</groupId>
>>>>>             <artifactId>flink-hadoop-compatibility_2.12</artifactId>
>>>>>             <version>1.13.2</version>
>>>>>         </dependency>
>>>>>
>>>>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-hadoop-fs -->
>>>>>         <dependency>
>>>>>             <groupId>org.apache.flink</groupId>
>>>>>             <artifactId>flink-hadoop-fs</artifactId>
>>>>>             <version>1.13.2</version>
>>>>>         </dependency>
>>>>>
>>>>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-csv -->
>>>>>         <dependency>
>>>>>             <groupId>org.apache.flink</groupId>
>>>>>             <artifactId>flink-csv</artifactId>
>>>>>             <version>1.13.2</version>
>>>>>         </dependency>
>>>>>
>>>>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-json -->
>>>>>         <dependency>
>>>>>             <groupId>org.apache.flink</groupId>
>>>>>             <artifactId>flink-json</artifactId>
>>>>>             <version>1.13.2</version>
>>>>>         </dependency>
>>>>>
>>>>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-hive-1.2.2 -->
>>>>>         <dependency>
>>>>>             <groupId>org.apache.flink</groupId>
>>>>>             <artifactId>flink-sql-connector-hive-2.3.6_2.12</artifactId>
>>>>>             <version>1.13.2</version>
>>>>>         </dependency>
>>>>>
>>>>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
>>>>>         <dependency>
>>>>>             <groupId>org.apache.flink</groupId>
>>>>>             <artifactId>flink-core</artifactId>
>>>>>             <version>1.13.2</version>
>>>>>         </dependency>
>>>>>
>>>>>         <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
>>>>>         <dependency>
>>>>>             <groupId>org.apache.kafka</groupId>
>>>>>             <artifactId>kafka-clients</artifactId>
>>>>>             <version>2.8.0</version>
>>>>>         </dependency>
>>>>>
>>>>>         <dependency>
>>>>>             <groupId>org.apache.kafka</groupId>
>>>>>             <artifactId>kafka_2.12</artifactId>
>>>>>             <version>2.8.0</version>
>>>>>         </dependency>
>>>>>
>>>>>
>>>>>         <!-- Add logging framework, to produce console output when running in the IDE. -->
>>>>>         <!-- These dependencies are excluded from the application JAR by default. -->
>>>>>         <dependency>
>>>>>             <groupId>org.slf4j</groupId>
>>>>>             <artifactId>slf4j-log4j12</artifactId>
>>>>>             <version>1.7.7</version>
>>>>>             <scope>runtime</scope>
>>>>>         </dependency>
>>>>>         <dependency>
>>>>>             <groupId>log4j</groupId>
>>>>>             <artifactId>log4j</artifactId>
>>>>>             <version>1.2.17</version>
>>>>>             <scope>runtime</scope>
>>>>>         </dependency>
>>>>>     </dependencies>
>>>>>
>>>>>     <build>
>>>>>         <plugins>
>>>>>
>>>>>             <!-- Java Compiler -->
>>>>>             <plugin>
>>>>>                 <groupId>org.apache.maven.plugins</groupId>
>>>>>                 <artifactId>maven-compiler-plugin</artifactId>
>>>>>                 <version>3.1</version>
>>>>>                 <configuration>
>>>>>                     <source>${java.version}</source>
>>>>>                     <target>${java.version}</target>
>>>>>                 </configuration>
>>>>>             </plugin>
>>>>>
>>>>>             <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
>>>>>             <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
>>>>>             <plugin>
>>>>>                 <groupId>org.apache.maven.plugins</groupId>
>>>>>                 <artifactId>maven-shade-plugin</artifactId>
>>>>>                 <version>3.0.0</version>
>>>>>                 <executions>
>>>>>                     <!-- Run shade goal on package phase -->
>>>>>                     <execution>
>>>>>                         <phase>package</phase>
>>>>>                         <goals>
>>>>>                             <goal>shade</goal>
>>>>>                         </goals>
>>>>>                         <configuration>
>>>>>                             <artifactSet>
>>>>>                                 <excludes>
>>>>>                                     <exclude>org.apache.flink:force-shading</exclude>
>>>>>                                     <exclude>com.google.code.findbugs:jsr305</exclude>
>>>>>                                     <exclude>org.slf4j:*</exclude>
>>>>>                                     <exclude>log4j:*</exclude>
>>>>>                                 </excludes>
>>>>>                             </artifactSet>
>>>>>                             <filters>
>>>>>                                 <filter>
>>>>>                                     <!-- Do not copy the signatures in the META-INF folder.
>>>>>                                     Otherwise, this might cause SecurityExceptions when using the JAR. -->
>>>>>                                     <artifact>*:*</artifact>
>>>>>                                     <excludes>
>>>>>                                         <exclude>META-INF/*.SF</exclude>
>>>>>                                         <exclude>META-INF/*.DSA</exclude>
>>>>>                                         <exclude>META-INF/*.RSA</exclude>
>>>>>                                     </excludes>
>>>>>                                 </filter>
>>>>>                             </filters>
>>>>>                             <transformers>
>>>>>                                 <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
>>>>>                                     <mainClass>com.harsh.test.StreamingJob</mainClass>
>>>>>                                 </transformer>
>>>>>                             </transformers>
>>>>>                         </configuration>
>>>>>                     </execution>
>>>>>                 </executions>
>>>>>             </plugin>
>>>>>         </plugins>
>>>>>
>>>>>         <pluginManagement>
>>>>>             <plugins>
>>>>>
>>>>>                 <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
>>>>>                 <plugin>
>>>>>                     <groupId>org.eclipse.m2e</groupId>
>>>>>                     <artifactId>lifecycle-mapping</artifactId>
>>>>>                     <version>1.0.0</version>
>>>>>                     <configuration>
>>>>>                         <lifecycleMappingMetadata>
>>>>>                             <pluginExecutions>
>>>>>                                 <pluginExecution>
>>>>>                                     <pluginExecutionFilter>
>>>>>                                         <groupId>org.apache.maven.plugins</groupId>
>>>>>                                         <artifactId>maven-shade-plugin</artifactId>
>>>>>                                         <versionRange>[3.0.0,)</versionRange>
>>>>>                                         <goals>
>>>>>                                             <goal>shade</goal>
>>>>>                                         </goals>
>>>>>                                     </pluginExecutionFilter>
>>>>>                                     <action>
>>>>>                                         <ignore/>
>>>>>                                     </action>
>>>>>                                 </pluginExecution>
>>>>>                                 <pluginExecution>
>>>>>                                     <pluginExecutionFilter>
>>>>>                                         <groupId>org.apache.maven.plugins</groupId>
>>>>>                                         <artifactId>maven-compiler-plugin</artifactId>
>>>>>                                         <versionRange>[3.1,)</versionRange>
>>>>>                                         <goals>
>>>>>                                             <goal>testCompile</goal>
>>>>>                                             <goal>compile</goal>
>>>>>                                         </goals>
>>>>>                                     </pluginExecutionFilter>
>>>>>                                     <action>
>>>>>                                         <ignore/>
>>>>>                                     </action>
>>>>>                                 </pluginExecution>
>>>>>                             </pluginExecutions>
>>>>>                         </lifecycleMappingMetadata>
>>>>>                     </configuration>
>>>>>                 </plugin>
>>>>>             </plugins>
>>>>>         </pluginManagement>
>>>>>     </build>
>>>>>
>>>>>     <!-- This profile helps to make things run out of the box in IntelliJ -->
>>>>>     <!-- Its adds Flink's core classes to the runtime class path. -->
>>>>>     <!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
>>>>>     <profiles>
>>>>>         <profile>
>>>>>             <id>add-dependencies-for-IDEA</id>
>>>>>
>>>>>             <activation>
>>>>>                 <property>
>>>>>                     <name>idea.version</name>
>>>>>                 </property>
>>>>>             </activation>
>>>>>
>>>>>             <dependencies>
>>>>>                 <dependency>
>>>>>                     <groupId>org.apache.flink</groupId>
>>>>>                     <artifactId>flink-java</artifactId>
>>>>>                     <version>${flink.version}</version>
>>>>>                     <scope>compile</scope>
>>>>>                 </dependency>
>>>>>                 <dependency>
>>>>>                     <groupId>org.apache.flink</groupId>
>>>>>                     <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
>>>>>                     <version>${flink.version}</version>
>>>>>                     <scope>compile</scope>
>>>>>                 </dependency>
>>>>>             </dependencies>
>>>>>         </profile>
>>>>>     </profiles>
>>>>>
>>>>> </project>
>>>>>
>>>>>
>>>>> Please help me resolve the issue.
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>>>>>
>>>>>
>>>
>>> --
>>> Thanks and Regards,
>>> Harshvardhan
>>> Data Platform
>>>
>>
>
> --
> Thanks and Regards,
> Harshvardhan
> Data Platform
>

Re: Issue while creating Hive table from Kafka topic

Posted by Harshvardhan Shinde <ha...@oyorooms.com>.
Here's the complete stack trace:

Server Response:org.apache.flink.runtime.rest.handler.RestHandlerException:
Could not execute application. at
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:108)
at
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
at
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) Caused by:
java.util.concurrent.CompletionException: java.lang.NoClassDefFoundError:
org/apache/kafka/common/serialization/ByteArrayDeserializer at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
... 7 more Caused by: java.lang.NoClassDefFoundError:
org/apache/kafka/common/serialization/ByteArrayDeserializer at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.setDeserializer(FlinkKafkaConsumer.java:322)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:223)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.<init>(FlinkKafkaConsumer.java:154)
at
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createKafkaConsumer(KafkaDynamicSource.java:383)
at
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.getScanRuntimeProvider(KafkaDynamicSource.java:205)
at
org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:453)
at
org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:161)
at
org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:119)
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:85)
at
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:169)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:161)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:989)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:958)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:283)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704)
at com.harsh.test.StreamingJob.main(StreamingJob.java:106) at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
at
org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
at
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:102)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
... 7 more

On Thu, Sep 9, 2021 at 5:36 PM Robert Metzger <rm...@apache.org> wrote:

> Can you share the full stack trace, not just a part of it?
>
> On Thu, Sep 9, 2021 at 1:43 PM Harshvardhan Shinde <
> harshvardhan.shinde@oyorooms.com> wrote:
>
>> Hi,
>>
>> I added the dependencies while trying to resolve the same issue, thought
>> I was missing them.
>>
>> Thanks
>>
>> On Thu, Sep 9, 2021 at 4:26 PM Robert Metzger <rm...@apache.org>
>> wrote:
>>
>>> Hey,
>>>
>>> Why do you have these dependencies in your pom?
>>>
>>>         <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
>>>         <dependency>
>>>             <groupId>org.apache.kafka</groupId>
>>>             <artifactId>kafka-clients</artifactId>
>>>             <version>2.8.0</version>
>>>         </dependency>
>>>
>>>         <dependency>
>>>             <groupId>org.apache.kafka</groupId>
>>>             <artifactId>kafka_2.12</artifactId>
>>>             <version>2.8.0</version>
>>>         </dependency>
>>>
>>>
>>> They are not needed for using the Kafka connector of Flink (the flink
>>> kafka connector dependencies pulls the required dependencies)
>>>
>>>
>>> On Thu, Sep 9, 2021 at 12:02 PM Harshvardhan Shinde <
>>> harshvardhan.shinde@oyorooms.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm trying a simple flink job that reads data from a kafka topic and
>>>> creates a Hive table.
>>>>
>>>> I'm following the steps from here
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/hive/overview/#connecting-to-hive>
>>>> .
>>>>
>>>> Here's my code:
>>>>
>>>> import org.apache.flink.table.api.EnvironmentSettings;
>>>> import org.apache.flink.table.api.Table;
>>>> import org.apache.flink.table.api.TableEnvironment;
>>>> import org.apache.flink.table.catalog.hive.HiveCatalog;
>>>>
>>>> EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>>> TableEnvironment tableEnv = TableEnvironment.create(settings);
>>>>
>>>> String name            = "myhive";
>>>> String defaultDatabase = "harsh_test";
>>>> String hiveConfDir     = "/etc/hive/conf";
>>>>
>>>> HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
>>>> tableEnv.registerCatalog(name, hive);
>>>>
>>>> // set the HiveCatalog as the current catalog of the session
>>>> tableEnv.useCatalog(name);
>>>>
>>>> tableEnv.executeSql("CREATE TABLE IF NOT EXISTS transactions (\n" +
>>>>       "  `created_at` TIMESTAMP(3) METADATA FROM 'timestamp',\n" +
>>>>       "      `partition` BIGINT METADATA VIRTUAL,\n" +
>>>>       "      `offset` BIGINT METADATA VIRTUAL,\n" +
>>>>       "    account_id  BIGINT,\n" +
>>>>       "    amount      BIGINT,\n" +
>>>>       "    transaction_time TIMESTAMP(3),\n" +
>>>>       "    WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +
>>>>       ") WITH (\n" +
>>>>       "    'connector' = 'kafka',\n" +
>>>>       "    'topic'     = 'flink-stream-table',\n" +
>>>>       "    'properties.bootstrap.servers' = '<BROKER_ADDRESS>:9092',\n" +
>>>>       "   'scan.startup.mode' = 'earliest-offset',\n" +
>>>>       "    'format'    = 'csv'\n" +
>>>>       ")");
>>>>
>>>> Table table = tableEnv.sqlQuery("Select * from transactions");
>>>> table.execute().print();
>>>>
>>>> The code builds successfully, but I'm getting the following runtime
>>>> error:
>>>>
>>>> Caused by: java.util.concurrent.CompletionException:
>>>> java.lang.NoClassDefFoundError:
>>>> org/apache/kafka/common/serialization/ByteArrayDeserializer at
>>>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
>>>> at
>>>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
>>>> at
>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
>>>> ..
>>>>
>>>> Here are my pom.xml file contents:
>>>>
>>>> <!--
>>>> Licensed to the Apache Software Foundation (ASF) under one
>>>> or more contributor license agreements.  See the NOTICE file
>>>> distributed with this work for additional information
>>>> regarding copyright ownership.  The ASF licenses this file
>>>> to you under the Apache License, Version 2.0 (the
>>>> "License"); you may not use this file except in compliance
>>>> with the License.  You may obtain a copy of the License at
>>>>
>>>>   http://www.apache.org/licenses/LICENSE-2.0
>>>>
>>>> Unless required by applicable law or agreed to in writing,
>>>> software distributed under the License is distributed on an
>>>> "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>>>> KIND, either express or implied.  See the License for the
>>>> specific language governing permissions and limitations
>>>> under the License.
>>>> -->
>>>> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>>>>          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
>>>>     <modelVersion>4.0.0</modelVersion>
>>>>
>>>>     <groupId>com.harsh.test</groupId>
>>>>     <artifactId>harsh-flink-test</artifactId>
>>>>     <version>1.0-SNAPSHOT</version>
>>>>     <packaging>jar</packaging>
>>>>
>>>>     <name>Flink Quickstart Job</name>
>>>>     <url>http://www.myorganization.org</url>
>>>>
>>>>     <properties>
>>>>         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>>>>         <flink.version>1.13.2</flink.version>
>>>>         <java.version>1.8</java.version>
>>>>         <hive.version>2.3.6</hive.version>
>>>>         <scala.binary.version>2.12</scala.binary.version>
>>>>         <maven.compiler.source>${java.version}</maven.compiler.source>
>>>>         <maven.compiler.target>${java.version}</maven.compiler.target>
>>>>     </properties>
>>>>
>>>>     <repositories>
>>>>         <repository>
>>>>             <id>apache.snapshots</id>
>>>>             <name>Apache Development Snapshot Repository</name>
>>>>             <url>https://repository.apache.org/content/repositories/snapshots/</url>
>>>>             <releases>
>>>>                 <enabled>false</enabled>
>>>>             </releases>
>>>>             <snapshots>
>>>>                 <enabled>true</enabled>
>>>>             </snapshots>
>>>>         </repository>
>>>>     </repositories>
>>>>
>>>>     <dependencies>
>>>>         <!-- Apache Flink dependencies -->
>>>>         <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
>>>>         <dependency>
>>>>             <groupId>org.apache.flink</groupId>
>>>>             <artifactId>flink-java</artifactId>
>>>>             <version>${flink.version}</version>
>>>>         </dependency>
>>>>         <dependency>
>>>>             <groupId>org.apache.flink</groupId>
>>>>             <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
>>>>             <version>${flink.version}</version>
>>>>         </dependency>
>>>>
>>>>         <!-- Add connector dependencies here. They must be in the default scope (compile). -->
>>>>
>>>>         <!-- Example:
>>>>
>>>>         <dependency>
>>>>             <groupId>org.apache.flink</groupId>
>>>>             <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
>>>>             <version>${flink.version}</version>
>>>>         </dependency>
>>>>         -->
>>>>
>>>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
>>>>         <dependency>
>>>>             <groupId>org.apache.flink</groupId>
>>>>             <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
>>>>             <version>${flink.version}</version>
>>>>         </dependency>
>>>>
>>>>         <dependency>
>>>>             <groupId>org.apache.flink</groupId>
>>>>             <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
>>>>             <version>${flink.version}</version>
>>>>         </dependency>
>>>>
>>>>         <dependency>
>>>>             <groupId>org.apache.flink</groupId>
>>>>             <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
>>>>             <version>${flink.version}</version>
>>>>         </dependency>
>>>>
>>>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
>>>>         <dependency>
>>>>             <groupId>org.apache.flink</groupId>
>>>>             <artifactId>flink-table-planner_2.12</artifactId>
>>>>             <version>1.13.2</version>
>>>>         </dependency>
>>>>
>>>>
>>>>         <!-- Flink Dependency -->
>>>>         <dependency>
>>>>             <groupId>org.apache.flink</groupId>
>>>>             <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
>>>>             <version>${flink.version}</version>
>>>>         </dependency>
>>>>
>>>>         <!-- Hive Dependency -->
>>>>         <dependency>
>>>>             <groupId>org.apache.hive</groupId>
>>>>             <artifactId>hive-exec</artifactId>
>>>>             <version>${hive.version}</version>
>>>>         </dependency>
>>>>
>>>>         <!-- https://mvnrepository.com/artifact/javax.servlet/javax.servlet-api -->
>>>>         <dependency>
>>>>             <groupId>javax.servlet</groupId>
>>>>             <artifactId>javax.servlet-api</artifactId>
>>>>             <version>3.1.0</version>
>>>>             <scope>provided</scope>
>>>>         </dependency>
>>>>
>>>>         <!-- https://mvnrepository.com/artifact/org.apache.htrace/htrace-core4 -->
>>>>         <dependency>
>>>>             <groupId>org.apache.htrace</groupId>
>>>>             <artifactId>htrace-core4</artifactId>
>>>>             <version>4.0.1-incubating</version>
>>>>         </dependency>
>>>>
>>>>         <!-- https://mvnrepository.com/artifact/commons-configuration/commons-configuration -->
>>>>         <dependency>
>>>>             <groupId>commons-configuration</groupId>
>>>>             <artifactId>commons-configuration</artifactId>
>>>>             <version>1.10</version>
>>>>         </dependency>
>>>>
>>>>         <!-- https://mvnrepository.com/artifact/commons-logging/commons-logging -->
>>>>         <dependency>
>>>>             <groupId>commons-logging</groupId>
>>>>             <artifactId>commons-logging</artifactId>
>>>>             <version>1.2</version>
>>>>         </dependency>
>>>>
>>>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2 -->
>>>>         <dependency>
>>>>             <groupId>org.apache.flink</groupId>
>>>>             <artifactId>flink-shaded-hadoop-2</artifactId>
>>>>             <version>2.8.3-10.0</version>
>>>>         </dependency>
>>>>
>>>>
>>>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-hadoop-compatibility -->
>>>>         <dependency>
>>>>             <groupId>org.apache.flink</groupId>
>>>>             <artifactId>flink-hadoop-compatibility_2.12</artifactId>
>>>>             <version>1.13.2</version>
>>>>         </dependency>
>>>>
>>>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-hadoop-fs -->
>>>>         <dependency>
>>>>             <groupId>org.apache.flink</groupId>
>>>>             <artifactId>flink-hadoop-fs</artifactId>
>>>>             <version>1.13.2</version>
>>>>         </dependency>
>>>>
>>>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-csv -->
>>>>         <dependency>
>>>>             <groupId>org.apache.flink</groupId>
>>>>             <artifactId>flink-csv</artifactId>
>>>>             <version>1.13.2</version>
>>>>         </dependency>
>>>>
>>>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-json -->
>>>>         <dependency>
>>>>             <groupId>org.apache.flink</groupId>
>>>>             <artifactId>flink-json</artifactId>
>>>>             <version>1.13.2</version>
>>>>         </dependency>
>>>>
>>>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-hive-1.2.2 -->
>>>>         <dependency>
>>>>             <groupId>org.apache.flink</groupId>
>>>>             <artifactId>flink-sql-connector-hive-2.3.6_2.12</artifactId>
>>>>             <version>1.13.2</version>
>>>>         </dependency>
>>>>
>>>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
>>>>         <dependency>
>>>>             <groupId>org.apache.flink</groupId>
>>>>             <artifactId>flink-core</artifactId>
>>>>             <version>1.13.2</version>
>>>>         </dependency>
>>>>
>>>>         <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
>>>>         <dependency>
>>>>             <groupId>org.apache.kafka</groupId>
>>>>             <artifactId>kafka-clients</artifactId>
>>>>             <version>2.8.0</version>
>>>>         </dependency>
>>>>
>>>>         <dependency>
>>>>             <groupId>org.apache.kafka</groupId>
>>>>             <artifactId>kafka_2.12</artifactId>
>>>>             <version>2.8.0</version>
>>>>         </dependency>
>>>>
>>>>
>>>>         <!-- Add logging framework, to produce console output when running in the IDE. -->
>>>>         <!-- These dependencies are excluded from the application JAR by default. -->
>>>>         <dependency>
>>>>             <groupId>org.slf4j</groupId>
>>>>             <artifactId>slf4j-log4j12</artifactId>
>>>>             <version>1.7.7</version>
>>>>             <scope>runtime</scope>
>>>>         </dependency>
>>>>         <dependency>
>>>>             <groupId>log4j</groupId>
>>>>             <artifactId>log4j</artifactId>
>>>>             <version>1.2.17</version>
>>>>             <scope>runtime</scope>
>>>>         </dependency>
>>>>     </dependencies>
>>>>
>>>>     <build>
>>>>         <plugins>
>>>>
>>>>             <!-- Java Compiler -->
>>>>             <plugin>
>>>>                 <groupId>org.apache.maven.plugins</groupId>
>>>>                 <artifactId>maven-compiler-plugin</artifactId>
>>>>                 <version>3.1</version>
>>>>                 <configuration>
>>>>                     <source>${java.version}</source>
>>>>                     <target>${java.version}</target>
>>>>                 </configuration>
>>>>             </plugin>
>>>>
>>>>             <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
>>>>             <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
>>>>             <plugin>
>>>>                 <groupId>org.apache.maven.plugins</groupId>
>>>>                 <artifactId>maven-shade-plugin</artifactId>
>>>>                 <version>3.0.0</version>
>>>>                 <executions>
>>>>                     <!-- Run shade goal on package phase -->
>>>>                     <execution>
>>>>                         <phase>package</phase>
>>>>                         <goals>
>>>>                             <goal>shade</goal>
>>>>                         </goals>
>>>>                         <configuration>
>>>>                             <artifactSet>
>>>>                                 <excludes>
>>>>                                     <exclude>org.apache.flink:force-shading</exclude>
>>>>                                     <exclude>com.google.code.findbugs:jsr305</exclude>
>>>>                                     <exclude>org.slf4j:*</exclude>
>>>>                                     <exclude>log4j:*</exclude>
>>>>                                 </excludes>
>>>>                             </artifactSet>
>>>>                             <filters>
>>>>                                 <filter>
>>>>                                     <!-- Do not copy the signatures in the META-INF folder.
>>>>                                     Otherwise, this might cause SecurityExceptions when using the JAR. -->
>>>>                                     <artifact>*:*</artifact>
>>>>                                     <excludes>
>>>>                                         <exclude>META-INF/*.SF</exclude>
>>>>                                         <exclude>META-INF/*.DSA</exclude>
>>>>                                         <exclude>META-INF/*.RSA</exclude>
>>>>                                     </excludes>
>>>>                                 </filter>
>>>>                             </filters>
>>>>                             <transformers>
>>>>                                 <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
>>>>                                     <mainClass>com.harsh.test.StreamingJob</mainClass>
>>>>                                 </transformer>
>>>>                             </transformers>
>>>>                         </configuration>
>>>>                     </execution>
>>>>                 </executions>
>>>>             </plugin>
>>>>         </plugins>
>>>>
>>>>         <pluginManagement>
>>>>             <plugins>
>>>>
>>>>                 <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
>>>>                 <plugin>
>>>>                     <groupId>org.eclipse.m2e</groupId>
>>>>                     <artifactId>lifecycle-mapping</artifactId>
>>>>                     <version>1.0.0</version>
>>>>                     <configuration>
>>>>                         <lifecycleMappingMetadata>
>>>>                             <pluginExecutions>
>>>>                                 <pluginExecution>
>>>>                                     <pluginExecutionFilter>
>>>>                                         <groupId>org.apache.maven.plugins</groupId>
>>>>                                         <artifactId>maven-shade-plugin</artifactId>
>>>>                                         <versionRange>[3.0.0,)</versionRange>
>>>>                                         <goals>
>>>>                                             <goal>shade</goal>
>>>>                                         </goals>
>>>>                                     </pluginExecutionFilter>
>>>>                                     <action>
>>>>                                         <ignore/>
>>>>                                     </action>
>>>>                                 </pluginExecution>
>>>>                                 <pluginExecution>
>>>>                                     <pluginExecutionFilter>
>>>>                                         <groupId>org.apache.maven.plugins</groupId>
>>>>                                         <artifactId>maven-compiler-plugin</artifactId>
>>>>                                         <versionRange>[3.1,)</versionRange>
>>>>                                         <goals>
>>>>                                             <goal>testCompile</goal>
>>>>                                             <goal>compile</goal>
>>>>                                         </goals>
>>>>                                     </pluginExecutionFilter>
>>>>                                     <action>
>>>>                                         <ignore/>
>>>>                                     </action>
>>>>                                 </pluginExecution>
>>>>                             </pluginExecutions>
>>>>                         </lifecycleMappingMetadata>
>>>>                     </configuration>
>>>>                 </plugin>
>>>>             </plugins>
>>>>         </pluginManagement>
>>>>     </build>
>>>>
>>>>     <!-- This profile helps to make things run out of the box in IntelliJ -->
>>>>     <!-- Its adds Flink's core classes to the runtime class path. -->
>>>>     <!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
>>>>     <profiles>
>>>>         <profile>
>>>>             <id>add-dependencies-for-IDEA</id>
>>>>
>>>>             <activation>
>>>>                 <property>
>>>>                     <name>idea.version</name>
>>>>                 </property>
>>>>             </activation>
>>>>
>>>>             <dependencies>
>>>>                 <dependency>
>>>>                     <groupId>org.apache.flink</groupId>
>>>>                     <artifactId>flink-java</artifactId>
>>>>                     <version>${flink.version}</version>
>>>>                     <scope>compile</scope>
>>>>                 </dependency>
>>>>                 <dependency>
>>>>                     <groupId>org.apache.flink</groupId>
>>>>                     <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
>>>>                     <version>${flink.version}</version>
>>>>                     <scope>compile</scope>
>>>>                 </dependency>
>>>>             </dependencies>
>>>>         </profile>
>>>>     </profiles>
>>>>
>>>> </project>
>>>>
>>>>
>>>> Please help me resolve the issue.
>>>>
>>>> Thanks
>>>>
>>>>
>>>>
>>>>
>>
>> --
>> Thanks and Regards,
>> Harshvardhan
>> Data Platform
>>
>

-- 
Thanks and Regards,
Harshvardhan
Data Platform

Re: Issue while creating Hive table from Kafka topic

Posted by Robert Metzger <rm...@apache.org>.
Can you share the full stack trace, not just a part of it?

On Thu, Sep 9, 2021 at 1:43 PM Harshvardhan Shinde <
harshvardhan.shinde@oyorooms.com> wrote:

> Hi,
>
> I added the dependencies while trying to resolve the same issue, thought I
> was missing them.
>
> Thanks
>
> On Thu, Sep 9, 2021 at 4:26 PM Robert Metzger <rm...@apache.org> wrote:
>
>> Hey,
>>
>> Why do you have these dependencies in your pom?
>>
>>         <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
>>         <dependency>
>>             <groupId>org.apache.kafka</groupId>
>>             <artifactId>kafka-clients</artifactId>
>>             <version>2.8.0</version>
>>         </dependency>
>>
>>         <dependency>
>>             <groupId>org.apache.kafka</groupId>
>>             <artifactId>kafka_2.12</artifactId>
>>             <version>2.8.0</version>
>>         </dependency>
>>
>>
>> They are not needed for using the Kafka connector of Flink (the flink
>> kafka connector dependencies pulls the required dependencies)
>>
>>
>> On Thu, Sep 9, 2021 at 12:02 PM Harshvardhan Shinde <
>> harshvardhan.shinde@oyorooms.com> wrote:
>>
>>> Hi,
>>>
>>> I'm trying a simple flink job that reads data from a kafka topic and
>>> creates a Hive table.
>>>
>>> I'm following the steps from here
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/hive/overview/#connecting-to-hive>
>>> .
>>>
>>> Here's my code:
>>>
>>> import org.apache.flink.table.api.EnvironmentSettings;
>>> import org.apache.flink.table.api.Table;
>>> import org.apache.flink.table.api.TableEnvironment;
>>> import org.apache.flink.table.catalog.hive.HiveCatalog;
>>>
>>> EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>> TableEnvironment tableEnv = TableEnvironment.create(settings);
>>>
>>> String name            = "myhive";
>>> String defaultDatabase = "harsh_test";
>>> String hiveConfDir     = "/etc/hive/conf";
>>>
>>> HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
>>> tableEnv.registerCatalog(name, hive);
>>>
>>> // set the HiveCatalog as the current catalog of the session
>>> tableEnv.useCatalog(name);
>>>
>>> tableEnv.executeSql("CREATE TABLE IF NOT EXISTS transactions (\n" +
>>>       "  `created_at` TIMESTAMP(3) METADATA FROM 'timestamp',\n" +
>>>       "      `partition` BIGINT METADATA VIRTUAL,\n" +
>>>       "      `offset` BIGINT METADATA VIRTUAL,\n" +
>>>       "    account_id  BIGINT,\n" +
>>>       "    amount      BIGINT,\n" +
>>>       "    transaction_time TIMESTAMP(3),\n" +
>>>       "    WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +
>>>       ") WITH (\n" +
>>>       "    'connector' = 'kafka',\n" +
>>>       "    'topic'     = 'flink-stream-table',\n" +
>>>       "    'properties.bootstrap.servers' = '<BROKER_ADDRESS>:9092',\n" +
>>>       "   'scan.startup.mode' = 'earliest-offset',\n" +
>>>       "    'format'    = 'csv'\n" +
>>>       ")");
>>>
>>> Table table = tableEnv.sqlQuery("Select * from transactions");
>>> table.execute().print();
>>>
>>> The code builds successfully, but I'm getting the following runtime
>>> error:
>>>
>>> Caused by: java.util.concurrent.CompletionException:
>>> java.lang.NoClassDefFoundError:
>>> org/apache/kafka/common/serialization/ByteArrayDeserializer at
>>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
>>> at
>>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
>>> at
>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
>>> ..
>>>
>>> Here are my pom.xml file contents:
>>>
>>> <!--
>>> Licensed to the Apache Software Foundation (ASF) under one
>>> or more contributor license agreements.  See the NOTICE file
>>> distributed with this work for additional information
>>> regarding copyright ownership.  The ASF licenses this file
>>> to you under the Apache License, Version 2.0 (the
>>> "License"); you may not use this file except in compliance
>>> with the License.  You may obtain a copy of the License at
>>>
>>>   http://www.apache.org/licenses/LICENSE-2.0
>>>
>>> Unless required by applicable law or agreed to in writing,
>>> software distributed under the License is distributed on an
>>> "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>>> KIND, either express or implied.  See the License for the
>>> specific language governing permissions and limitations
>>> under the License.
>>> -->
>>> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>>>          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
>>>     <modelVersion>4.0.0</modelVersion>
>>>
>>>     <groupId>com.harsh.test</groupId>
>>>     <artifactId>harsh-flink-test</artifactId>
>>>     <version>1.0-SNAPSHOT</version>
>>>     <packaging>jar</packaging>
>>>
>>>     <name>Flink Quickstart Job</name>
>>>     <url>http://www.myorganization.org</url>
>>>
>>>     <properties>
>>>         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>>>         <flink.version>1.13.2</flink.version>
>>>         <java.version>1.8</java.version>
>>>         <hive.version>2.3.6</hive.version>
>>>         <scala.binary.version>2.12</scala.binary.version>
>>>         <maven.compiler.source>${java.version}</maven.compiler.source>
>>>         <maven.compiler.target>${java.version}</maven.compiler.target>
>>>     </properties>
>>>
>>>     <repositories>
>>>         <repository>
>>>             <id>apache.snapshots</id>
>>>             <name>Apache Development Snapshot Repository</name>
>>>             <url>https://repository.apache.org/content/repositories/snapshots/</url>
>>>             <releases>
>>>                 <enabled>false</enabled>
>>>             </releases>
>>>             <snapshots>
>>>                 <enabled>true</enabled>
>>>             </snapshots>
>>>         </repository>
>>>     </repositories>
>>>
>>>     <dependencies>
>>>         <!-- Apache Flink dependencies -->
>>>         <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
>>>         <dependency>
>>>             <groupId>org.apache.flink</groupId>
>>>             <artifactId>flink-java</artifactId>
>>>             <version>${flink.version}</version>
>>>         </dependency>
>>>         <dependency>
>>>             <groupId>org.apache.flink</groupId>
>>>             <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
>>>             <version>${flink.version}</version>
>>>         </dependency>
>>>
>>>         <!-- Add connector dependencies here. They must be in the default scope (compile). -->
>>>
>>>         <!-- Example:
>>>
>>>         <dependency>
>>>             <groupId>org.apache.flink</groupId>
>>>             <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
>>>             <version>${flink.version}</version>
>>>         </dependency>
>>>         -->
>>>
>>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
>>>         <dependency>
>>>             <groupId>org.apache.flink</groupId>
>>>             <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
>>>             <version>${flink.version}</version>
>>>         </dependency>
>>>
>>>         <dependency>
>>>             <groupId>org.apache.flink</groupId>
>>>             <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
>>>             <version>${flink.version}</version>
>>>         </dependency>
>>>
>>>         <dependency>
>>>             <groupId>org.apache.flink</groupId>
>>>             <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
>>>             <version>${flink.version}</version>
>>>         </dependency>
>>>
>>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
>>>         <dependency>
>>>             <groupId>org.apache.flink</groupId>
>>>             <artifactId>flink-table-planner_2.12</artifactId>
>>>             <version>1.13.2</version>
>>>         </dependency>
>>>
>>>
>>>         <!-- Flink Dependency -->
>>>         <dependency>
>>>             <groupId>org.apache.flink</groupId>
>>>             <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
>>>             <version>${flink.version}</version>
>>>         </dependency>
>>>
>>>         <!-- Hive Dependency -->
>>>         <dependency>
>>>             <groupId>org.apache.hive</groupId>
>>>             <artifactId>hive-exec</artifactId>
>>>             <version>${hive.version}</version>
>>>         </dependency>
>>>
>>>         <!-- https://mvnrepository.com/artifact/javax.servlet/javax.servlet-api -->
>>>         <dependency>
>>>             <groupId>javax.servlet</groupId>
>>>             <artifactId>javax.servlet-api</artifactId>
>>>             <version>3.1.0</version>
>>>             <scope>provided</scope>
>>>         </dependency>
>>>
>>>         <!-- https://mvnrepository.com/artifact/org.apache.htrace/htrace-core4 -->
>>>         <dependency>
>>>             <groupId>org.apache.htrace</groupId>
>>>             <artifactId>htrace-core4</artifactId>
>>>             <version>4.0.1-incubating</version>
>>>         </dependency>
>>>
>>>         <!-- https://mvnrepository.com/artifact/commons-configuration/commons-configuration -->
>>>         <dependency>
>>>             <groupId>commons-configuration</groupId>
>>>             <artifactId>commons-configuration</artifactId>
>>>             <version>1.10</version>
>>>         </dependency>
>>>
>>>         <!-- https://mvnrepository.com/artifact/commons-logging/commons-logging -->
>>>         <dependency>
>>>             <groupId>commons-logging</groupId>
>>>             <artifactId>commons-logging</artifactId>
>>>             <version>1.2</version>
>>>         </dependency>
>>>
>>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2 -->
>>>         <dependency>
>>>             <groupId>org.apache.flink</groupId>
>>>             <artifactId>flink-shaded-hadoop-2</artifactId>
>>>             <version>2.8.3-10.0</version>
>>>         </dependency>
>>>
>>>
>>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-hadoop-compatibility -->
>>>         <dependency>
>>>             <groupId>org.apache.flink</groupId>
>>>             <artifactId>flink-hadoop-compatibility_2.12</artifactId>
>>>             <version>1.13.2</version>
>>>         </dependency>
>>>
>>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-hadoop-fs -->
>>>         <dependency>
>>>             <groupId>org.apache.flink</groupId>
>>>             <artifactId>flink-hadoop-fs</artifactId>
>>>             <version>1.13.2</version>
>>>         </dependency>
>>>
>>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-csv -->
>>>         <dependency>
>>>             <groupId>org.apache.flink</groupId>
>>>             <artifactId>flink-csv</artifactId>
>>>             <version>1.13.2</version>
>>>         </dependency>
>>>
>>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-json -->
>>>         <dependency>
>>>             <groupId>org.apache.flink</groupId>
>>>             <artifactId>flink-json</artifactId>
>>>             <version>1.13.2</version>
>>>         </dependency>
>>>
>>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-hive-1.2.2 -->
>>>         <dependency>
>>>             <groupId>org.apache.flink</groupId>
>>>             <artifactId>flink-sql-connector-hive-2.3.6_2.12</artifactId>
>>>             <version>1.13.2</version>
>>>         </dependency>
>>>
>>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
>>>         <dependency>
>>>             <groupId>org.apache.flink</groupId>
>>>             <artifactId>flink-core</artifactId>
>>>             <version>1.13.2</version>
>>>         </dependency>
>>>
>>>         <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
>>>         <dependency>
>>>             <groupId>org.apache.kafka</groupId>
>>>             <artifactId>kafka-clients</artifactId>
>>>             <version>2.8.0</version>
>>>         </dependency>
>>>
>>>         <dependency>
>>>             <groupId>org.apache.kafka</groupId>
>>>             <artifactId>kafka_2.12</artifactId>
>>>             <version>2.8.0</version>
>>>         </dependency>
>>>
>>>
>>>         <!-- Add logging framework, to produce console output when running in the IDE. -->
>>>         <!-- These dependencies are excluded from the application JAR by default. -->
>>>         <dependency>
>>>             <groupId>org.slf4j</groupId>
>>>             <artifactId>slf4j-log4j12</artifactId>
>>>             <version>1.7.7</version>
>>>             <scope>runtime</scope>
>>>         </dependency>
>>>         <dependency>
>>>             <groupId>log4j</groupId>
>>>             <artifactId>log4j</artifactId>
>>>             <version>1.2.17</version>
>>>             <scope>runtime</scope>
>>>         </dependency>
>>>     </dependencies>
>>>
>>>     <build>
>>>         <plugins>
>>>
>>>             <!-- Java Compiler -->
>>>             <plugin>
>>>                 <groupId>org.apache.maven.plugins</groupId>
>>>                 <artifactId>maven-compiler-plugin</artifactId>
>>>                 <version>3.1</version>
>>>                 <configuration>
>>>                     <source>${java.version}</source>
>>>                     <target>${java.version}</target>
>>>                 </configuration>
>>>             </plugin>
>>>
>>>             <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
>>>             <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
>>>             <plugin>
>>>                 <groupId>org.apache.maven.plugins</groupId>
>>>                 <artifactId>maven-shade-plugin</artifactId>
>>>                 <version>3.0.0</version>
>>>                 <executions>
>>>                     <!-- Run shade goal on package phase -->
>>>                     <execution>
>>>                         <phase>package</phase>
>>>                         <goals>
>>>                             <goal>shade</goal>
>>>                         </goals>
>>>                         <configuration>
>>>                             <artifactSet>
>>>                                 <excludes>
>>>                                     <exclude>org.apache.flink:force-shading</exclude>
>>>                                     <exclude>com.google.code.findbugs:jsr305</exclude>
>>>                                     <exclude>org.slf4j:*</exclude>
>>>                                     <exclude>log4j:*</exclude>
>>>                                 </excludes>
>>>                             </artifactSet>
>>>                             <filters>
>>>                                 <filter>
>>>                                     <!-- Do not copy the signatures in the META-INF folder.
>>>                                     Otherwise, this might cause SecurityExceptions when using the JAR. -->
>>>                                     <artifact>*:*</artifact>
>>>                                     <excludes>
>>>                                         <exclude>META-INF/*.SF</exclude>
>>>                                         <exclude>META-INF/*.DSA</exclude>
>>>                                         <exclude>META-INF/*.RSA</exclude>
>>>                                     </excludes>
>>>                                 </filter>
>>>                             </filters>
>>>                             <transformers>
>>>                                 <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
>>>                                     <mainClass>com.harsh.test.StreamingJob</mainClass>
>>>                                 </transformer>
>>>                             </transformers>
>>>                         </configuration>
>>>                     </execution>
>>>                 </executions>
>>>             </plugin>
>>>         </plugins>
>>>
>>>         <pluginManagement>
>>>             <plugins>
>>>
>>>                 <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
>>>                 <plugin>
>>>                     <groupId>org.eclipse.m2e</groupId>
>>>                     <artifactId>lifecycle-mapping</artifactId>
>>>                     <version>1.0.0</version>
>>>                     <configuration>
>>>                         <lifecycleMappingMetadata>
>>>                             <pluginExecutions>
>>>                                 <pluginExecution>
>>>                                     <pluginExecutionFilter>
>>>                                         <groupId>org.apache.maven.plugins</groupId>
>>>                                         <artifactId>maven-shade-plugin</artifactId>
>>>                                         <versionRange>[3.0.0,)</versionRange>
>>>                                         <goals>
>>>                                             <goal>shade</goal>
>>>                                         </goals>
>>>                                     </pluginExecutionFilter>
>>>                                     <action>
>>>                                         <ignore/>
>>>                                     </action>
>>>                                 </pluginExecution>
>>>                                 <pluginExecution>
>>>                                     <pluginExecutionFilter>
>>>                                         <groupId>org.apache.maven.plugins</groupId>
>>>                                         <artifactId>maven-compiler-plugin</artifactId>
>>>                                         <versionRange>[3.1,)</versionRange>
>>>                                         <goals>
>>>                                             <goal>testCompile</goal>
>>>                                             <goal>compile</goal>
>>>                                         </goals>
>>>                                     </pluginExecutionFilter>
>>>                                     <action>
>>>                                         <ignore/>
>>>                                     </action>
>>>                                 </pluginExecution>
>>>                             </pluginExecutions>
>>>                         </lifecycleMappingMetadata>
>>>                     </configuration>
>>>                 </plugin>
>>>             </plugins>
>>>         </pluginManagement>
>>>     </build>
>>>
>>>     <!-- This profile helps to make things run out of the box in IntelliJ -->
>>>     <!-- Its adds Flink's core classes to the runtime class path. -->
>>>     <!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
>>>     <profiles>
>>>         <profile>
>>>             <id>add-dependencies-for-IDEA</id>
>>>
>>>             <activation>
>>>                 <property>
>>>                     <name>idea.version</name>
>>>                 </property>
>>>             </activation>
>>>
>>>             <dependencies>
>>>                 <dependency>
>>>                     <groupId>org.apache.flink</groupId>
>>>                     <artifactId>flink-java</artifactId>
>>>                     <version>${flink.version}</version>
>>>                     <scope>compile</scope>
>>>                 </dependency>
>>>                 <dependency>
>>>                     <groupId>org.apache.flink</groupId>
>>>                     <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
>>>                     <version>${flink.version}</version>
>>>                     <scope>compile</scope>
>>>                 </dependency>
>>>             </dependencies>
>>>         </profile>
>>>     </profiles>
>>>
>>> </project>
>>>
>>>
>>> Please help me resolve the issue.
>>>
>>> Thanks
>>>
>>>
>>>
>>>
>
> --
> Thanks and Regards,
> Harshvardhan
> Data Platform
>

Re: Issue while creating Hive table from Kafka topic

Posted by Harshvardhan Shinde <ha...@oyorooms.com>.
Hi,

I added the dependencies while trying to resolve the same issue, thought I
was missing them.

Thanks

On Thu, Sep 9, 2021 at 4:26 PM Robert Metzger <rm...@apache.org> wrote:

> Hey,
>
> Why do you have these dependencies in your pom?
>
>         <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
>         <dependency>
>             <groupId>org.apache.kafka</groupId>
>             <artifactId>kafka-clients</artifactId>
>             <version>2.8.0</version>
>         </dependency>
>
>         <dependency>
>             <groupId>org.apache.kafka</groupId>
>             <artifactId>kafka_2.12</artifactId>
>             <version>2.8.0</version>
>         </dependency>
>
>
> They are not needed for using the Kafka connector of Flink (the flink
> kafka connector dependencies pulls the required dependencies)
>
>
> On Thu, Sep 9, 2021 at 12:02 PM Harshvardhan Shinde <
> harshvardhan.shinde@oyorooms.com> wrote:
>
>> Hi,
>>
>> I'm trying a simple flink job that reads data from a kafka topic and
>> creates a Hive table.
>>
>> I'm following the steps from here
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/hive/overview/#connecting-to-hive>
>> .
>>
>> Here's my code:
>>
>> import org.apache.flink.table.api.EnvironmentSettings;
>> import org.apache.flink.table.api.Table;
>> import org.apache.flink.table.api.TableEnvironment;
>> import org.apache.flink.table.catalog.hive.HiveCatalog;
>>
>> EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> TableEnvironment tableEnv = TableEnvironment.create(settings);
>>
>> String name            = "myhive";
>> String defaultDatabase = "harsh_test";
>> String hiveConfDir     = "/etc/hive/conf";
>>
>> HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
>> tableEnv.registerCatalog(name, hive);
>>
>> // set the HiveCatalog as the current catalog of the session
>> tableEnv.useCatalog(name);
>>
>> tableEnv.executeSql("CREATE TABLE IF NOT EXISTS transactions (\n" +
>>       "  `created_at` TIMESTAMP(3) METADATA FROM 'timestamp',\n" +
>>       "      `partition` BIGINT METADATA VIRTUAL,\n" +
>>       "      `offset` BIGINT METADATA VIRTUAL,\n" +
>>       "    account_id  BIGINT,\n" +
>>       "    amount      BIGINT,\n" +
>>       "    transaction_time TIMESTAMP(3),\n" +
>>       "    WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +
>>       ") WITH (\n" +
>>       "    'connector' = 'kafka',\n" +
>>       "    'topic'     = 'flink-stream-table',\n" +
>>       "    'properties.bootstrap.servers' = '<BROKER_ADDRESS>:9092',\n" +
>>       "   'scan.startup.mode' = 'earliest-offset',\n" +
>>       "    'format'    = 'csv'\n" +
>>       ")");
>>
>> Table table = tableEnv.sqlQuery("Select * from transactions");
>> table.execute().print();
>>
>> The code builds successfully, but I'm getting the following runtime error:
>>
>> Caused by: java.util.concurrent.CompletionException:
>> java.lang.NoClassDefFoundError:
>> org/apache/kafka/common/serialization/ByteArrayDeserializer at
>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
>> at
>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
>> at
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
>> ..
>>
>> Here are my pom.xml file contents:
>>
>> <!--
>> Licensed to the Apache Software Foundation (ASF) under one
>> or more contributor license agreements.  See the NOTICE file
>> distributed with this work for additional information
>> regarding copyright ownership.  The ASF licenses this file
>> to you under the Apache License, Version 2.0 (the
>> "License"); you may not use this file except in compliance
>> with the License.  You may obtain a copy of the License at
>>
>>   http://www.apache.org/licenses/LICENSE-2.0
>>
>> Unless required by applicable law or agreed to in writing,
>> software distributed under the License is distributed on an
>> "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>> KIND, either express or implied.  See the License for the
>> specific language governing permissions and limitations
>> under the License.
>> -->
>> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>>          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
>>     <modelVersion>4.0.0</modelVersion>
>>
>>     <groupId>com.harsh.test</groupId>
>>     <artifactId>harsh-flink-test</artifactId>
>>     <version>1.0-SNAPSHOT</version>
>>     <packaging>jar</packaging>
>>
>>     <name>Flink Quickstart Job</name>
>>     <url>http://www.myorganization.org</url>
>>
>>     <properties>
>>         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>>         <flink.version>1.13.2</flink.version>
>>         <java.version>1.8</java.version>
>>         <hive.version>2.3.6</hive.version>
>>         <scala.binary.version>2.12</scala.binary.version>
>>         <maven.compiler.source>${java.version}</maven.compiler.source>
>>         <maven.compiler.target>${java.version}</maven.compiler.target>
>>     </properties>
>>
>>     <repositories>
>>         <repository>
>>             <id>apache.snapshots</id>
>>             <name>Apache Development Snapshot Repository</name>
>>             <url>https://repository.apache.org/content/repositories/snapshots/</url>
>>             <releases>
>>                 <enabled>false</enabled>
>>             </releases>
>>             <snapshots>
>>                 <enabled>true</enabled>
>>             </snapshots>
>>         </repository>
>>     </repositories>
>>
>>     <dependencies>
>>         <!-- Apache Flink dependencies -->
>>         <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
>>         <dependency>
>>             <groupId>org.apache.flink</groupId>
>>             <artifactId>flink-java</artifactId>
>>             <version>${flink.version}</version>
>>         </dependency>
>>         <dependency>
>>             <groupId>org.apache.flink</groupId>
>>             <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
>>             <version>${flink.version}</version>
>>         </dependency>
>>
>>         <!-- Add connector dependencies here. They must be in the default scope (compile). -->
>>
>>         <!-- Example:
>>
>>         <dependency>
>>             <groupId>org.apache.flink</groupId>
>>             <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
>>             <version>${flink.version}</version>
>>         </dependency>
>>         -->
>>
>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
>>         <dependency>
>>             <groupId>org.apache.flink</groupId>
>>             <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
>>             <version>${flink.version}</version>
>>         </dependency>
>>
>>         <dependency>
>>             <groupId>org.apache.flink</groupId>
>>             <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
>>             <version>${flink.version}</version>
>>         </dependency>
>>
>>         <dependency>
>>             <groupId>org.apache.flink</groupId>
>>             <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
>>             <version>${flink.version}</version>
>>         </dependency>
>>
>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
>>         <dependency>
>>             <groupId>org.apache.flink</groupId>
>>             <artifactId>flink-table-planner_2.12</artifactId>
>>             <version>1.13.2</version>
>>         </dependency>
>>
>>
>>         <!-- Flink Dependency -->
>>         <dependency>
>>             <groupId>org.apache.flink</groupId>
>>             <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
>>             <version>${flink.version}</version>
>>         </dependency>
>>
>>         <!-- Hive Dependency -->
>>         <dependency>
>>             <groupId>org.apache.hive</groupId>
>>             <artifactId>hive-exec</artifactId>
>>             <version>${hive.version}</version>
>>         </dependency>
>>
>>         <!-- https://mvnrepository.com/artifact/javax.servlet/javax.servlet-api -->
>>         <dependency>
>>             <groupId>javax.servlet</groupId>
>>             <artifactId>javax.servlet-api</artifactId>
>>             <version>3.1.0</version>
>>             <scope>provided</scope>
>>         </dependency>
>>
>>         <!-- https://mvnrepository.com/artifact/org.apache.htrace/htrace-core4 -->
>>         <dependency>
>>             <groupId>org.apache.htrace</groupId>
>>             <artifactId>htrace-core4</artifactId>
>>             <version>4.0.1-incubating</version>
>>         </dependency>
>>
>>         <!-- https://mvnrepository.com/artifact/commons-configuration/commons-configuration -->
>>         <dependency>
>>             <groupId>commons-configuration</groupId>
>>             <artifactId>commons-configuration</artifactId>
>>             <version>1.10</version>
>>         </dependency>
>>
>>         <!-- https://mvnrepository.com/artifact/commons-logging/commons-logging -->
>>         <dependency>
>>             <groupId>commons-logging</groupId>
>>             <artifactId>commons-logging</artifactId>
>>             <version>1.2</version>
>>         </dependency>
>>
>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2 -->
>>         <dependency>
>>             <groupId>org.apache.flink</groupId>
>>             <artifactId>flink-shaded-hadoop-2</artifactId>
>>             <version>2.8.3-10.0</version>
>>         </dependency>
>>
>>
>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-hadoop-compatibility -->
>>         <dependency>
>>             <groupId>org.apache.flink</groupId>
>>             <artifactId>flink-hadoop-compatibility_2.12</artifactId>
>>             <version>1.13.2</version>
>>         </dependency>
>>
>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-hadoop-fs -->
>>         <dependency>
>>             <groupId>org.apache.flink</groupId>
>>             <artifactId>flink-hadoop-fs</artifactId>
>>             <version>1.13.2</version>
>>         </dependency>
>>
>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-csv -->
>>         <dependency>
>>             <groupId>org.apache.flink</groupId>
>>             <artifactId>flink-csv</artifactId>
>>             <version>1.13.2</version>
>>         </dependency>
>>
>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-json -->
>>         <dependency>
>>             <groupId>org.apache.flink</groupId>
>>             <artifactId>flink-json</artifactId>
>>             <version>1.13.2</version>
>>         </dependency>
>>
>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-hive-1.2.2 -->
>>         <dependency>
>>             <groupId>org.apache.flink</groupId>
>>             <artifactId>flink-sql-connector-hive-2.3.6_2.12</artifactId>
>>             <version>1.13.2</version>
>>         </dependency>
>>
>>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
>>         <dependency>
>>             <groupId>org.apache.flink</groupId>
>>             <artifactId>flink-core</artifactId>
>>             <version>1.13.2</version>
>>         </dependency>
>>
>>         <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
>>         <dependency>
>>             <groupId>org.apache.kafka</groupId>
>>             <artifactId>kafka-clients</artifactId>
>>             <version>2.8.0</version>
>>         </dependency>
>>
>>         <dependency>
>>             <groupId>org.apache.kafka</groupId>
>>             <artifactId>kafka_2.12</artifactId>
>>             <version>2.8.0</version>
>>         </dependency>
>>
>>
>>         <!-- Add logging framework, to produce console output when running in the IDE. -->
>>         <!-- These dependencies are excluded from the application JAR by default. -->
>>         <dependency>
>>             <groupId>org.slf4j</groupId>
>>             <artifactId>slf4j-log4j12</artifactId>
>>             <version>1.7.7</version>
>>             <scope>runtime</scope>
>>         </dependency>
>>         <dependency>
>>             <groupId>log4j</groupId>
>>             <artifactId>log4j</artifactId>
>>             <version>1.2.17</version>
>>             <scope>runtime</scope>
>>         </dependency>
>>     </dependencies>
>>
>>     <build>
>>         <plugins>
>>
>>             <!-- Java Compiler -->
>>             <plugin>
>>                 <groupId>org.apache.maven.plugins</groupId>
>>                 <artifactId>maven-compiler-plugin</artifactId>
>>                 <version>3.1</version>
>>                 <configuration>
>>                     <source>${java.version}</source>
>>                     <target>${java.version}</target>
>>                 </configuration>
>>             </plugin>
>>
>>             <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
>>             <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
>>             <plugin>
>>                 <groupId>org.apache.maven.plugins</groupId>
>>                 <artifactId>maven-shade-plugin</artifactId>
>>                 <version>3.0.0</version>
>>                 <executions>
>>                     <!-- Run shade goal on package phase -->
>>                     <execution>
>>                         <phase>package</phase>
>>                         <goals>
>>                             <goal>shade</goal>
>>                         </goals>
>>                         <configuration>
>>                             <artifactSet>
>>                                 <excludes>
>>                                     <exclude>org.apache.flink:force-shading</exclude>
>>                                     <exclude>com.google.code.findbugs:jsr305</exclude>
>>                                     <exclude>org.slf4j:*</exclude>
>>                                     <exclude>log4j:*</exclude>
>>                                 </excludes>
>>                             </artifactSet>
>>                             <filters>
>>                                 <filter>
>>                                     <!-- Do not copy the signatures in the META-INF folder.
>>                                     Otherwise, this might cause SecurityExceptions when using the JAR. -->
>>                                     <artifact>*:*</artifact>
>>                                     <excludes>
>>                                         <exclude>META-INF/*.SF</exclude>
>>                                         <exclude>META-INF/*.DSA</exclude>
>>                                         <exclude>META-INF/*.RSA</exclude>
>>                                     </excludes>
>>                                 </filter>
>>                             </filters>
>>                             <transformers>
>>                                 <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
>>                                     <mainClass>com.harsh.test.StreamingJob</mainClass>
>>                                 </transformer>
>>                             </transformers>
>>                         </configuration>
>>                     </execution>
>>                 </executions>
>>             </plugin>
>>         </plugins>
>>
>>         <pluginManagement>
>>             <plugins>
>>
>>                 <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
>>                 <plugin>
>>                     <groupId>org.eclipse.m2e</groupId>
>>                     <artifactId>lifecycle-mapping</artifactId>
>>                     <version>1.0.0</version>
>>                     <configuration>
>>                         <lifecycleMappingMetadata>
>>                             <pluginExecutions>
>>                                 <pluginExecution>
>>                                     <pluginExecutionFilter>
>>                                         <groupId>org.apache.maven.plugins</groupId>
>>                                         <artifactId>maven-shade-plugin</artifactId>
>>                                         <versionRange>[3.0.0,)</versionRange>
>>                                         <goals>
>>                                             <goal>shade</goal>
>>                                         </goals>
>>                                     </pluginExecutionFilter>
>>                                     <action>
>>                                         <ignore/>
>>                                     </action>
>>                                 </pluginExecution>
>>                                 <pluginExecution>
>>                                     <pluginExecutionFilter>
>>                                         <groupId>org.apache.maven.plugins</groupId>
>>                                         <artifactId>maven-compiler-plugin</artifactId>
>>                                         <versionRange>[3.1,)</versionRange>
>>                                         <goals>
>>                                             <goal>testCompile</goal>
>>                                             <goal>compile</goal>
>>                                         </goals>
>>                                     </pluginExecutionFilter>
>>                                     <action>
>>                                         <ignore/>
>>                                     </action>
>>                                 </pluginExecution>
>>                             </pluginExecutions>
>>                         </lifecycleMappingMetadata>
>>                     </configuration>
>>                 </plugin>
>>             </plugins>
>>         </pluginManagement>
>>     </build>
>>
>>     <!-- This profile helps to make things run out of the box in IntelliJ -->
>>     <!-- Its adds Flink's core classes to the runtime class path. -->
>>     <!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
>>     <profiles>
>>         <profile>
>>             <id>add-dependencies-for-IDEA</id>
>>
>>             <activation>
>>                 <property>
>>                     <name>idea.version</name>
>>                 </property>
>>             </activation>
>>
>>             <dependencies>
>>                 <dependency>
>>                     <groupId>org.apache.flink</groupId>
>>                     <artifactId>flink-java</artifactId>
>>                     <version>${flink.version}</version>
>>                     <scope>compile</scope>
>>                 </dependency>
>>                 <dependency>
>>                     <groupId>org.apache.flink</groupId>
>>                     <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
>>                     <version>${flink.version}</version>
>>                     <scope>compile</scope>
>>                 </dependency>
>>             </dependencies>
>>         </profile>
>>     </profiles>
>>
>> </project>
>>
>>
>> Please help me resolve the issue.
>>
>> Thanks
>>
>>
>>
>>

-- 
Thanks and Regards,
Harshvardhan
Data Platform

Re: Issue while creating Hive table from Kafka topic

Posted by Robert Metzger <rm...@apache.org>.
Hey,

Why do you have these dependencies in your pom?

        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.8.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>2.8.0</version>
        </dependency>


They are not needed for using the Kafka connector of Flink (the flink kafka
connector dependencies pulls the required dependencies)


On Thu, Sep 9, 2021 at 12:02 PM Harshvardhan Shinde <
harshvardhan.shinde@oyorooms.com> wrote:

> Hi,
>
> I'm trying a simple flink job that reads data from a kafka topic and
> creates a Hive table.
>
> I'm following the steps from here
> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/hive/overview/#connecting-to-hive>
> .
>
> Here's my code:
>
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.TableEnvironment;
> import org.apache.flink.table.catalog.hive.HiveCatalog;
>
> EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> TableEnvironment tableEnv = TableEnvironment.create(settings);
>
> String name            = "myhive";
> String defaultDatabase = "harsh_test";
> String hiveConfDir     = "/etc/hive/conf";
>
> HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
> tableEnv.registerCatalog(name, hive);
>
> // set the HiveCatalog as the current catalog of the session
> tableEnv.useCatalog(name);
>
> tableEnv.executeSql("CREATE TABLE IF NOT EXISTS transactions (\n" +
>       "  `created_at` TIMESTAMP(3) METADATA FROM 'timestamp',\n" +
>       "      `partition` BIGINT METADATA VIRTUAL,\n" +
>       "      `offset` BIGINT METADATA VIRTUAL,\n" +
>       "    account_id  BIGINT,\n" +
>       "    amount      BIGINT,\n" +
>       "    transaction_time TIMESTAMP(3),\n" +
>       "    WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +
>       ") WITH (\n" +
>       "    'connector' = 'kafka',\n" +
>       "    'topic'     = 'flink-stream-table',\n" +
>       "    'properties.bootstrap.servers' = '<BROKER_ADDRESS>:9092',\n" +
>       "   'scan.startup.mode' = 'earliest-offset',\n" +
>       "    'format'    = 'csv'\n" +
>       ")");
>
> Table table = tableEnv.sqlQuery("Select * from transactions");
> table.execute().print();
>
> The code builds successfully, but I'm getting the following runtime error:
>
> Caused by: java.util.concurrent.CompletionException:
> java.lang.NoClassDefFoundError:
> org/apache/kafka/common/serialization/ByteArrayDeserializer at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
> ..
>
> Here are my pom.xml file contents:
>
> <!--
> Licensed to the Apache Software Foundation (ASF) under one
> or more contributor license agreements.  See the NOTICE file
> distributed with this work for additional information
> regarding copyright ownership.  The ASF licenses this file
> to you under the Apache License, Version 2.0 (the
> "License"); you may not use this file except in compliance
> with the License.  You may obtain a copy of the License at
>
>   http://www.apache.org/licenses/LICENSE-2.0
>
> Unless required by applicable law or agreed to in writing,
> software distributed under the License is distributed on an
> "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> KIND, either express or implied.  See the License for the
> specific language governing permissions and limitations
> under the License.
> -->
> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
>     <modelVersion>4.0.0</modelVersion>
>
>     <groupId>com.harsh.test</groupId>
>     <artifactId>harsh-flink-test</artifactId>
>     <version>1.0-SNAPSHOT</version>
>     <packaging>jar</packaging>
>
>     <name>Flink Quickstart Job</name>
>     <url>http://www.myorganization.org</url>
>
>     <properties>
>         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>         <flink.version>1.13.2</flink.version>
>         <java.version>1.8</java.version>
>         <hive.version>2.3.6</hive.version>
>         <scala.binary.version>2.12</scala.binary.version>
>         <maven.compiler.source>${java.version}</maven.compiler.source>
>         <maven.compiler.target>${java.version}</maven.compiler.target>
>     </properties>
>
>     <repositories>
>         <repository>
>             <id>apache.snapshots</id>
>             <name>Apache Development Snapshot Repository</name>
>             <url>https://repository.apache.org/content/repositories/snapshots/</url>
>             <releases>
>                 <enabled>false</enabled>
>             </releases>
>             <snapshots>
>                 <enabled>true</enabled>
>             </snapshots>
>         </repository>
>     </repositories>
>
>     <dependencies>
>         <!-- Apache Flink dependencies -->
>         <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-java</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>
>         <!-- Add connector dependencies here. They must be in the default scope (compile). -->
>
>         <!-- Example:
>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>         -->
>
>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>
>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-table-planner_2.12</artifactId>
>             <version>1.13.2</version>
>         </dependency>
>
>
>         <!-- Flink Dependency -->
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>
>         <!-- Hive Dependency -->
>         <dependency>
>             <groupId>org.apache.hive</groupId>
>             <artifactId>hive-exec</artifactId>
>             <version>${hive.version}</version>
>         </dependency>
>
>         <!-- https://mvnrepository.com/artifact/javax.servlet/javax.servlet-api -->
>         <dependency>
>             <groupId>javax.servlet</groupId>
>             <artifactId>javax.servlet-api</artifactId>
>             <version>3.1.0</version>
>             <scope>provided</scope>
>         </dependency>
>
>         <!-- https://mvnrepository.com/artifact/org.apache.htrace/htrace-core4 -->
>         <dependency>
>             <groupId>org.apache.htrace</groupId>
>             <artifactId>htrace-core4</artifactId>
>             <version>4.0.1-incubating</version>
>         </dependency>
>
>         <!-- https://mvnrepository.com/artifact/commons-configuration/commons-configuration -->
>         <dependency>
>             <groupId>commons-configuration</groupId>
>             <artifactId>commons-configuration</artifactId>
>             <version>1.10</version>
>         </dependency>
>
>         <!-- https://mvnrepository.com/artifact/commons-logging/commons-logging -->
>         <dependency>
>             <groupId>commons-logging</groupId>
>             <artifactId>commons-logging</artifactId>
>             <version>1.2</version>
>         </dependency>
>
>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-shaded-hadoop-2 -->
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-shaded-hadoop-2</artifactId>
>             <version>2.8.3-10.0</version>
>         </dependency>
>
>
>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-hadoop-compatibility -->
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-hadoop-compatibility_2.12</artifactId>
>             <version>1.13.2</version>
>         </dependency>
>
>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-hadoop-fs -->
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-hadoop-fs</artifactId>
>             <version>1.13.2</version>
>         </dependency>
>
>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-csv -->
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-csv</artifactId>
>             <version>1.13.2</version>
>         </dependency>
>
>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-json -->
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-json</artifactId>
>             <version>1.13.2</version>
>         </dependency>
>
>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-hive-1.2.2 -->
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-sql-connector-hive-2.3.6_2.12</artifactId>
>             <version>1.13.2</version>
>         </dependency>
>
>         <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-core</artifactId>
>             <version>1.13.2</version>
>         </dependency>
>
>         <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
>         <dependency>
>             <groupId>org.apache.kafka</groupId>
>             <artifactId>kafka-clients</artifactId>
>             <version>2.8.0</version>
>         </dependency>
>
>         <dependency>
>             <groupId>org.apache.kafka</groupId>
>             <artifactId>kafka_2.12</artifactId>
>             <version>2.8.0</version>
>         </dependency>
>
>
>         <!-- Add logging framework, to produce console output when running in the IDE. -->
>         <!-- These dependencies are excluded from the application JAR by default. -->
>         <dependency>
>             <groupId>org.slf4j</groupId>
>             <artifactId>slf4j-log4j12</artifactId>
>             <version>1.7.7</version>
>             <scope>runtime</scope>
>         </dependency>
>         <dependency>
>             <groupId>log4j</groupId>
>             <artifactId>log4j</artifactId>
>             <version>1.2.17</version>
>             <scope>runtime</scope>
>         </dependency>
>     </dependencies>
>
>     <build>
>         <plugins>
>
>             <!-- Java Compiler -->
>             <plugin>
>                 <groupId>org.apache.maven.plugins</groupId>
>                 <artifactId>maven-compiler-plugin</artifactId>
>                 <version>3.1</version>
>                 <configuration>
>                     <source>${java.version}</source>
>                     <target>${java.version}</target>
>                 </configuration>
>             </plugin>
>
>             <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
>             <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
>             <plugin>
>                 <groupId>org.apache.maven.plugins</groupId>
>                 <artifactId>maven-shade-plugin</artifactId>
>                 <version>3.0.0</version>
>                 <executions>
>                     <!-- Run shade goal on package phase -->
>                     <execution>
>                         <phase>package</phase>
>                         <goals>
>                             <goal>shade</goal>
>                         </goals>
>                         <configuration>
>                             <artifactSet>
>                                 <excludes>
>                                     <exclude>org.apache.flink:force-shading</exclude>
>                                     <exclude>com.google.code.findbugs:jsr305</exclude>
>                                     <exclude>org.slf4j:*</exclude>
>                                     <exclude>log4j:*</exclude>
>                                 </excludes>
>                             </artifactSet>
>                             <filters>
>                                 <filter>
>                                     <!-- Do not copy the signatures in the META-INF folder.
>                                     Otherwise, this might cause SecurityExceptions when using the JAR. -->
>                                     <artifact>*:*</artifact>
>                                     <excludes>
>                                         <exclude>META-INF/*.SF</exclude>
>                                         <exclude>META-INF/*.DSA</exclude>
>                                         <exclude>META-INF/*.RSA</exclude>
>                                     </excludes>
>                                 </filter>
>                             </filters>
>                             <transformers>
>                                 <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
>                                     <mainClass>com.harsh.test.StreamingJob</mainClass>
>                                 </transformer>
>                             </transformers>
>                         </configuration>
>                     </execution>
>                 </executions>
>             </plugin>
>         </plugins>
>
>         <pluginManagement>
>             <plugins>
>
>                 <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
>                 <plugin>
>                     <groupId>org.eclipse.m2e</groupId>
>                     <artifactId>lifecycle-mapping</artifactId>
>                     <version>1.0.0</version>
>                     <configuration>
>                         <lifecycleMappingMetadata>
>                             <pluginExecutions>
>                                 <pluginExecution>
>                                     <pluginExecutionFilter>
>                                         <groupId>org.apache.maven.plugins</groupId>
>                                         <artifactId>maven-shade-plugin</artifactId>
>                                         <versionRange>[3.0.0,)</versionRange>
>                                         <goals>
>                                             <goal>shade</goal>
>                                         </goals>
>                                     </pluginExecutionFilter>
>                                     <action>
>                                         <ignore/>
>                                     </action>
>                                 </pluginExecution>
>                                 <pluginExecution>
>                                     <pluginExecutionFilter>
>                                         <groupId>org.apache.maven.plugins</groupId>
>                                         <artifactId>maven-compiler-plugin</artifactId>
>                                         <versionRange>[3.1,)</versionRange>
>                                         <goals>
>                                             <goal>testCompile</goal>
>                                             <goal>compile</goal>
>                                         </goals>
>                                     </pluginExecutionFilter>
>                                     <action>
>                                         <ignore/>
>                                     </action>
>                                 </pluginExecution>
>                             </pluginExecutions>
>                         </lifecycleMappingMetadata>
>                     </configuration>
>                 </plugin>
>             </plugins>
>         </pluginManagement>
>     </build>
>
>     <!-- This profile helps to make things run out of the box in IntelliJ -->
>     <!-- Its adds Flink's core classes to the runtime class path. -->
>     <!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
>     <profiles>
>         <profile>
>             <id>add-dependencies-for-IDEA</id>
>
>             <activation>
>                 <property>
>                     <name>idea.version</name>
>                 </property>
>             </activation>
>
>             <dependencies>
>                 <dependency>
>                     <groupId>org.apache.flink</groupId>
>                     <artifactId>flink-java</artifactId>
>                     <version>${flink.version}</version>
>                     <scope>compile</scope>
>                 </dependency>
>                 <dependency>
>                     <groupId>org.apache.flink</groupId>
>                     <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
>                     <version>${flink.version}</version>
>                     <scope>compile</scope>
>                 </dependency>
>             </dependencies>
>         </profile>
>     </profiles>
>
> </project>
>
>
> Please help me resolve the issue.
>
> Thanks
>
>
>
>