You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yik San Chan <ev...@gmail.com> on 2021/04/07 02:55:58 UTC

Flink: Exception from container-launch exitCode=2

*The question is cross-posted on Stack
Overflow https://stackoverflow.com/questions/66968180/flink-exception-from-container-launch-exitcode-2
<https://stackoverflow.com/questions/66968180/flink-exception-from-container-launch-exitcode-2>.
Viewing the question on Stack Overflow is preferred as I include a few
images for better description.*

Hi community,

## Flink (Scala) exitCode=2

I have a simple Flink job that reads from 2 columns of a Hive table
`mysource`, add up the columns, then writes the result to another Hive
table `mysink`, which `mysource` has 2 columns `a bigint` and `b bigint`,
and `mysink` has only 1 column `c bigint`.

The job submits successfully, however, I observe it keeps retrying.

[![enter image description here][1]][1]

I click into each attempt, they simply show this.

```
AM Container for appattempt_1607399514900_2511_001267 exited with exitCode:
2
For more detailed output, check application tracking page:
http://cn-hz-h-test-data-flink00:8088/cluster/app/application_1607399514900_2511Then,
click on links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_e13_1607399514900_2511_1267_000001
Exit code: 2
Stack trace: ExitCodeException exitCode=2:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
at org.apache.hadoop.util.Shell.run(Shell.java:479)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
at
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Container exited with a non-zero exit code 2
Failing this attempt
```

However, the "Logs" has no useful info - it complains about the logging
lib, but I believe they are really warnings, not errors.

```
LogType:jobmanager.err
Log Upload Time:Wed Apr 07 10:30:52 +0800 2021
LogLength:1010
Log Contents:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in
[jar:file:/data/apache/hadoop/hadoop-2.7.3/logs/tmp/nm-local-dir/usercache/zhongtai/appcache/application_1607399514900_2509/filecache/10/featurepipelines-0.1.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]/
SLF4J: Found binding in
[jar:file:/data/apache/hadoop/hadoop-2.7.3/logs/tmp/nm-local-dir/filecache/302/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in
[jar:file:/data/apache/hadoop/hadoop-2.7.3/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
log4j:WARN No appenders could be found for logger
(org.apache.flink.runtime.entrypoint.ClusterEntrypoint).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
more info.
End of LogType:jobmanager.err

LogType:jobmanager.out
Log Upload Time:Wed Apr 07 10:30:52 +0800 2021
LogLength:0
Log Contents:
End of LogType:jobmanager.out
```

This is the job written in Scala.

```scala
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.catalog.hive.HiveCatalog

object HiveToyExample {
  def main(args: Array[String]): Unit = {
    val settings = EnvironmentSettings.newInstance.build
    val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = StreamTableEnvironment.create(execEnv, settings)

    val hiveCatalog = new HiveCatalog(
      "myhive",
      "aiinfra",
      "/data/apache/hive/apache-hive-2.1.0-bin/conf/"
    )
    tableEnv.registerCatalog("myhive", hiveCatalog)
    tableEnv.useCatalog("myhive")

    tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)

    tableEnv
      .executeSql("""
          |INSERT INTO mysink
          |SELECT a + b
          |FROM mysource
          |""".stripMargin)
  }
}
```

Here's the pom.xml.

```xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>exmple</groupId>
<artifactId>featurepipelines</artifactId>
<version>0.1.1</version>
<packaging>jar</packaging>

<name>Feature Pipelines</name>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.12.0</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.12</scala.version>
<log4j.version>2.12.1</log4j.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.1.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
</dependency>
</dependencies>

<build>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<shadedClassifierName>Shade</shadedClassifierName>
<createDependencyReducedPom>false</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.4.1</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
```

This is how I package the jar.

```
mvn clean package
```

This is how I run the job.

```
flink run \
--yarnname scalaflink-hive-test \
-m yarn-cluster \
-yarnqueue datadev \
--class featurepipelines.ingestion.HiveToyExample \
./featurepipelines-0.1.1.jar
```

## PyFlink rewrite works just fine?!

Since the logic is so simple, I rewrite the job with PyFlink to see what
happens. Here shows the PyFlink rewrite.

```python
import os
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import *
from pyflink.table.catalog import HiveCatalog

settings = EnvironmentSettings.new_instance().use_blink_planner().build()
exec_env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(exec_env,
environment_settings=settings)

# There exists such a jar in the path
t_env.get_config().get_configuration().set_string(
    "pipeline.jars",
f"file://{os.getcwd()}/deps/flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar"
)

catalog_name = "myhive"
default_database = "aiinfra"
hive_conf_dir = "/data/apache/hive/apache-hive-2.1.0-bin/conf/"

hive_catalog = HiveCatalog(catalog_name, default_database, hive_conf_dir)
t_env.register_catalog(catalog_name, hive_catalog)
t_env.use_catalog(catalog_name)

TRANSFORM_DML = """
INSERT INTO mysink
SELECT a + b
FROM mysource
"""

t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)
t_env.execute_sql(TRANSFORM_DML).wait()
```

This is how I run the PyFlink job.

```
flink run \
--yarnname pyflink-hive-test \
-m yarn-cluster \
-yD yarn.application.queue=tech_platform \
-pyarch pyflink1.12.0.zip \
-pyexec /data/software/pyflink1.12.0/bin/python \
-py /data/home/pal-flink/chenyisheng14418/feature-pipelines/pyflink/hive.py
```

Surprisingly, the job runs fine - it finishes soon, with result written to
the `mysink` table.

## Why?

Given the comparison, I highly doubt the Scala job fails because it is not
packaged correctly, even though I follow [Flink Docs](
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#program-maven),
which can be verified by looking at my pom.

> If you are building your own program, you need the following dependencies
in your mvn file. It’s recommended not to include these dependencies in the
resulting jar file. You’re supposed to add dependencies as stated above at
runtime.

```
<!-- Flink Dependency -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-hive_2.11</artifactId>
  <version>1.12.0</version>
  <scope>provided</scope>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.11</artifactId>
  <version>1.12.0</version>
  <scope>provided</scope>
</dependency>

<!-- Hive Dependency -->
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>${hive.version}</version>
    <scope>provided</scope>
</dependency>
```

Also, I have included flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar in
/lib of my flink distribution, as suggested in [Flink docs](
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#using-bundled-hive-jar
):

> the recommended way to add dependency is to use a bundled jar. Separate
jars should be used only if bundled jars don’t meet your needs.

What do I miss?

Best,
Yik San

  [1]: https://i.stack.imgur.com/fBsHS.png
  [2]: https://i.stack.imgur.com/ilNtr.png

Re: Flink: Exception from container-launch exitCode=2

Posted by Till Rohrmann <tr...@apache.org>.
I actually think that the logging problem is caused by Hadoop 2.7.3 which
pulls in the slf4j-log4j12-1.7.10.jar. This binding is then used but there
is no proper configuration file for log4j because Flink actually uses
log4j2.

Cheers,
Till

On Fri, Apr 9, 2021 at 12:05 PM Till Rohrmann <tr...@apache.org> wrote:

> Hi Yik San,
>
> to me it looks as if there is a problem with the job and the deployment.
> Unfortunately, the logging seems to not have worked. Could you check that
> you have a valid log4j.properties file in your conf directory.
>
> Cheers,
> Till
>
> On Wed, Apr 7, 2021 at 4:57 AM Yik San Chan <ev...@gmail.com>
> wrote:
>
>> *The question is cross-posted on Stack
>> Overflow https://stackoverflow.com/questions/66968180/flink-exception-from-container-launch-exitcode-2
>> <https://stackoverflow.com/questions/66968180/flink-exception-from-container-launch-exitcode-2>.
>> Viewing the question on Stack Overflow is preferred as I include a few
>> images for better description.*
>>
>> Hi community,
>>
>> ## Flink (Scala) exitCode=2
>>
>> I have a simple Flink job that reads from 2 columns of a Hive table
>> `mysource`, add up the columns, then writes the result to another Hive
>> table `mysink`, which `mysource` has 2 columns `a bigint` and `b bigint`,
>> and `mysink` has only 1 column `c bigint`.
>>
>> The job submits successfully, however, I observe it keeps retrying.
>>
>> [![enter image description here][1]][1]
>>
>> I click into each attempt, they simply show this.
>>
>> ```
>> AM Container for appattempt_1607399514900_2511_001267 exited with
>> exitCode: 2
>> For more detailed output, check application tracking page:
>> http://cn-hz-h-test-data-flink00:8088/cluster/app/application_1607399514900_2511Then,
>> click on links to logs of each attempt.
>> Diagnostics: Exception from container-launch.
>> Container id: container_e13_1607399514900_2511_1267_000001
>> Exit code: 2
>> Stack trace: ExitCodeException exitCode=2:
>> at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
>> at org.apache.hadoop.util.Shell.run(Shell.java:479)
>> at
>> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
>> at
>> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
>> at
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
>> at
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Container exited with a non-zero exit code 2
>> Failing this attempt
>> ```
>>
>> However, the "Logs" has no useful info - it complains about the logging
>> lib, but I believe they are really warnings, not errors.
>>
>> ```
>> LogType:jobmanager.err
>> Log Upload Time:Wed Apr 07 10:30:52 +0800 2021
>> LogLength:1010
>> Log Contents:
>> SLF4J: Class path contains multiple SLF4J bindings.
>> SLF4J: Found binding in
>> [jar:file:/data/apache/hadoop/hadoop-2.7.3/logs/tmp/nm-local-dir/usercache/zhongtai/appcache/application_1607399514900_2509/filecache/10/featurepipelines-0.1.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]/
>> SLF4J: Found binding in
>> [jar:file:/data/apache/hadoop/hadoop-2.7.3/logs/tmp/nm-local-dir/filecache/302/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: Found binding in
>> [jar:file:/data/apache/hadoop/hadoop-2.7.3/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>> explanation.
>> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
>> log4j:WARN No appenders could be found for logger
>> (org.apache.flink.runtime.entrypoint.ClusterEntrypoint).
>> log4j:WARN Please initialize the log4j system properly.
>> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
>> more info.
>> End of LogType:jobmanager.err
>>
>> LogType:jobmanager.out
>> Log Upload Time:Wed Apr 07 10:30:52 +0800 2021
>> LogLength:0
>> Log Contents:
>> End of LogType:jobmanager.out
>> ```
>>
>> This is the job written in Scala.
>>
>> ```scala
>> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>> import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect}
>> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
>> import org.apache.flink.table.catalog.hive.HiveCatalog
>>
>> object HiveToyExample {
>>   def main(args: Array[String]): Unit = {
>>     val settings = EnvironmentSettings.newInstance.build
>>     val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
>>     val tableEnv = StreamTableEnvironment.create(execEnv, settings)
>>
>>     val hiveCatalog = new HiveCatalog(
>>       "myhive",
>>       "aiinfra",
>>       "/data/apache/hive/apache-hive-2.1.0-bin/conf/"
>>     )
>>     tableEnv.registerCatalog("myhive", hiveCatalog)
>>     tableEnv.useCatalog("myhive")
>>
>>     tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
>>
>>     tableEnv
>>       .executeSql("""
>>           |INSERT INTO mysink
>>           |SELECT a + b
>>           |FROM mysource
>>           |""".stripMargin)
>>   }
>> }
>> ```
>>
>> Here's the pom.xml.
>>
>> ```xml
>> <project xmlns="http://maven.apache.org/POM/4.0.0"
>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
>> http://maven.apache.org/xsd/maven-4.0.0.xsd">
>> <modelVersion>4.0.0</modelVersion>
>>
>> <groupId>exmple</groupId>
>> <artifactId>featurepipelines</artifactId>
>> <version>0.1.1</version>
>> <packaging>jar</packaging>
>>
>> <name>Feature Pipelines</name>
>>
>> <properties>
>> <maven.compiler.source>8</maven.compiler.source>
>> <maven.compiler.target>8</maven.compiler.target>
>> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>> <flink.version>1.12.0</flink.version>
>> <scala.binary.version>2.11</scala.binary.version>
>> <scala.version>2.11.12</scala.version>
>> <log4j.version>2.12.1</log4j.version>
>> </properties>
>>
>> <dependencies>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
>> <version>${flink.version}</version>
>> <scope>provided</scope>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-clients_${scala.binary.version}</artifactId>
>> <version>${flink.version}</version>
>> <scope>provided</scope>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>>
>> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
>> <version>${flink.version}</version>
>> <scope>provided</scope>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
>> <version>${flink.version}</version>
>> <scope>provided</scope>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.flink</groupId>
>> <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
>> <version>${flink.version}</version>
>> <scope>provided</scope>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.hive</groupId>
>> <artifactId>hive-exec</artifactId>
>> <version>2.1.0</version>
>> <scope>provided</scope>
>> </dependency>
>> <dependency>
>> <groupId>org.slf4j</groupId>
>> <artifactId>slf4j-log4j12</artifactId>
>> <version>1.7.7</version>
>> </dependency>
>> </dependencies>
>>
>> <build>
>> <resources>
>> <resource>
>> <directory>src/main/resources</directory>
>> <filtering>true</filtering>
>> </resource>
>> </resources>
>> <plugins>
>> <plugin>
>> <groupId>org.apache.maven.plugins</groupId>
>> <artifactId>maven-shade-plugin</artifactId>
>> <version>3.2.4</version>
>> <executions>
>> <execution>
>> <phase>package</phase>
>> <goals>
>> <goal>shade</goal>
>> </goals>
>> <configuration>
>> <shadedArtifactAttached>false</shadedArtifactAttached>
>> <shadedClassifierName>Shade</shadedClassifierName>
>> <createDependencyReducedPom>false</createDependencyReducedPom>
>> <filters>
>> <filter>
>> <artifact>*:*</artifact>
>> <excludes>
>> <exclude>META-INF/*.SF</exclude>
>> <exclude>META-INF/*.DSA</exclude>
>> <exclude>META-INF/*.RSA</exclude>
>> </excludes>
>> </filter>
>> </filters>
>> </configuration>
>> </execution>
>> </executions>
>> </plugin>
>> <plugin>
>> <groupId>net.alchim31.maven</groupId>
>> <artifactId>scala-maven-plugin</artifactId>
>> <version>4.4.1</version>
>> <executions>
>> <execution>
>> <goals>
>> <goal>compile</goal>
>> <goal>testCompile</goal>
>> </goals>
>> </execution>
>> </executions>
>> </plugin>
>> </plugins>
>> </build>
>> </project>
>> ```
>>
>> This is how I package the jar.
>>
>> ```
>> mvn clean package
>> ```
>>
>> This is how I run the job.
>>
>> ```
>> flink run \
>> --yarnname scalaflink-hive-test \
>> -m yarn-cluster \
>> -yarnqueue datadev \
>> --class featurepipelines.ingestion.HiveToyExample \
>> ./featurepipelines-0.1.1.jar
>> ```
>>
>> ## PyFlink rewrite works just fine?!
>>
>> Since the logic is so simple, I rewrite the job with PyFlink to see what
>> happens. Here shows the PyFlink rewrite.
>>
>> ```python
>> import os
>> from pyflink.datastream import StreamExecutionEnvironment
>> from pyflink.table import *
>> from pyflink.table.catalog import HiveCatalog
>>
>> settings = EnvironmentSettings.new_instance().use_blink_planner().build()
>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>> t_env = StreamTableEnvironment.create(exec_env,
>> environment_settings=settings)
>>
>> # There exists such a jar in the path
>> t_env.get_config().get_configuration().set_string(
>>     "pipeline.jars",
>> f"file://{os.getcwd()}/deps/flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar"
>> )
>>
>> catalog_name = "myhive"
>> default_database = "aiinfra"
>> hive_conf_dir = "/data/apache/hive/apache-hive-2.1.0-bin/conf/"
>>
>> hive_catalog = HiveCatalog(catalog_name, default_database, hive_conf_dir)
>> t_env.register_catalog(catalog_name, hive_catalog)
>> t_env.use_catalog(catalog_name)
>>
>> TRANSFORM_DML = """
>> INSERT INTO mysink
>> SELECT a + b
>> FROM mysource
>> """
>>
>> t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)
>> t_env.execute_sql(TRANSFORM_DML).wait()
>> ```
>>
>> This is how I run the PyFlink job.
>>
>> ```
>> flink run \
>> --yarnname pyflink-hive-test \
>> -m yarn-cluster \
>> -yD yarn.application.queue=tech_platform \
>> -pyarch pyflink1.12.0.zip \
>> -pyexec /data/software/pyflink1.12.0/bin/python \
>> -py
>> /data/home/pal-flink/chenyisheng14418/feature-pipelines/pyflink/hive.py
>> ```
>>
>> Surprisingly, the job runs fine - it finishes soon, with result written
>> to the `mysink` table.
>>
>> ## Why?
>>
>> Given the comparison, I highly doubt the Scala job fails because it is
>> not packaged correctly, even though I follow [Flink Docs](
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#program-maven),
>> which can be verified by looking at my pom.
>>
>> > If you are building your own program, you need the following
>> dependencies in your mvn file. It’s recommended not to include these
>> dependencies in the resulting jar file. You’re supposed to add dependencies
>> as stated above at runtime.
>>
>> ```
>> <!-- Flink Dependency -->
>> <dependency>
>>   <groupId>org.apache.flink</groupId>
>>   <artifactId>flink-connector-hive_2.11</artifactId>
>>   <version>1.12.0</version>
>>   <scope>provided</scope>
>> </dependency>
>>
>> <dependency>
>>   <groupId>org.apache.flink</groupId>
>>   <artifactId>flink-table-api-java-bridge_2.11</artifactId>
>>   <version>1.12.0</version>
>>   <scope>provided</scope>
>> </dependency>
>>
>> <!-- Hive Dependency -->
>> <dependency>
>>     <groupId>org.apache.hive</groupId>
>>     <artifactId>hive-exec</artifactId>
>>     <version>${hive.version}</version>
>>     <scope>provided</scope>
>> </dependency>
>> ```
>>
>> Also, I have included flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar in
>> /lib of my flink distribution, as suggested in [Flink docs](
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#using-bundled-hive-jar
>> ):
>>
>> > the recommended way to add dependency is to use a bundled jar. Separate
>> jars should be used only if bundled jars don’t meet your needs.
>>
>> What do I miss?
>>
>> Best,
>> Yik San
>>
>>   [1]: https://i.stack.imgur.com/fBsHS.png
>>   [2]: https://i.stack.imgur.com/ilNtr.png
>>
>

Re: Flink: Exception from container-launch exitCode=2

Posted by Till Rohrmann <tr...@apache.org>.
Hi Yik San,

to me it looks as if there is a problem with the job and the deployment.
Unfortunately, the logging seems to not have worked. Could you check that
you have a valid log4j.properties file in your conf directory.

Cheers,
Till

On Wed, Apr 7, 2021 at 4:57 AM Yik San Chan <ev...@gmail.com>
wrote:

> *The question is cross-posted on Stack
> Overflow https://stackoverflow.com/questions/66968180/flink-exception-from-container-launch-exitcode-2
> <https://stackoverflow.com/questions/66968180/flink-exception-from-container-launch-exitcode-2>.
> Viewing the question on Stack Overflow is preferred as I include a few
> images for better description.*
>
> Hi community,
>
> ## Flink (Scala) exitCode=2
>
> I have a simple Flink job that reads from 2 columns of a Hive table
> `mysource`, add up the columns, then writes the result to another Hive
> table `mysink`, which `mysource` has 2 columns `a bigint` and `b bigint`,
> and `mysink` has only 1 column `c bigint`.
>
> The job submits successfully, however, I observe it keeps retrying.
>
> [![enter image description here][1]][1]
>
> I click into each attempt, they simply show this.
>
> ```
> AM Container for appattempt_1607399514900_2511_001267 exited with
> exitCode: 2
> For more detailed output, check application tracking page:
> http://cn-hz-h-test-data-flink00:8088/cluster/app/application_1607399514900_2511Then,
> click on links to logs of each attempt.
> Diagnostics: Exception from container-launch.
> Container id: container_e13_1607399514900_2511_1267_000001
> Exit code: 2
> Stack trace: ExitCodeException exitCode=2:
> at org.apache.hadoop.util.Shell.runCommand(Shell.java:582)
> at org.apache.hadoop.util.Shell.run(Shell.java:479)
> at
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:773)
> at
> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:212)
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Container exited with a non-zero exit code 2
> Failing this attempt
> ```
>
> However, the "Logs" has no useful info - it complains about the logging
> lib, but I believe they are really warnings, not errors.
>
> ```
> LogType:jobmanager.err
> Log Upload Time:Wed Apr 07 10:30:52 +0800 2021
> LogLength:1010
> Log Contents:
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/data/apache/hadoop/hadoop-2.7.3/logs/tmp/nm-local-dir/usercache/zhongtai/appcache/application_1607399514900_2509/filecache/10/featurepipelines-0.1.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]/
> SLF4J: Found binding in
> [jar:file:/data/apache/hadoop/hadoop-2.7.3/logs/tmp/nm-local-dir/filecache/302/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/data/apache/hadoop/hadoop-2.7.3/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> log4j:WARN No appenders could be found for logger
> (org.apache.flink.runtime.entrypoint.ClusterEntrypoint).
> log4j:WARN Please initialize the log4j system properly.
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
> more info.
> End of LogType:jobmanager.err
>
> LogType:jobmanager.out
> Log Upload Time:Wed Apr 07 10:30:52 +0800 2021
> LogLength:0
> Log Contents:
> End of LogType:jobmanager.out
> ```
>
> This is the job written in Scala.
>
> ```scala
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.table.api.{EnvironmentSettings, SqlDialect}
> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
> import org.apache.flink.table.catalog.hive.HiveCatalog
>
> object HiveToyExample {
>   def main(args: Array[String]): Unit = {
>     val settings = EnvironmentSettings.newInstance.build
>     val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
>     val tableEnv = StreamTableEnvironment.create(execEnv, settings)
>
>     val hiveCatalog = new HiveCatalog(
>       "myhive",
>       "aiinfra",
>       "/data/apache/hive/apache-hive-2.1.0-bin/conf/"
>     )
>     tableEnv.registerCatalog("myhive", hiveCatalog)
>     tableEnv.useCatalog("myhive")
>
>     tableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
>
>     tableEnv
>       .executeSql("""
>           |INSERT INTO mysink
>           |SELECT a + b
>           |FROM mysource
>           |""".stripMargin)
>   }
> }
> ```
>
> Here's the pom.xml.
>
> ```xml
> <project xmlns="http://maven.apache.org/POM/4.0.0"
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
> http://maven.apache.org/xsd/maven-4.0.0.xsd">
> <modelVersion>4.0.0</modelVersion>
>
> <groupId>exmple</groupId>
> <artifactId>featurepipelines</artifactId>
> <version>0.1.1</version>
> <packaging>jar</packaging>
>
> <name>Feature Pipelines</name>
>
> <properties>
> <maven.compiler.source>8</maven.compiler.source>
> <maven.compiler.target>8</maven.compiler.target>
> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
> <flink.version>1.12.0</flink.version>
> <scala.binary.version>2.11</scala.binary.version>
> <scala.version>2.11.12</scala.version>
> <log4j.version>2.12.1</log4j.version>
> </properties>
>
> <dependencies>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
> <version>${flink.version}</version>
> <scope>provided</scope>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-clients_${scala.binary.version}</artifactId>
> <version>${flink.version}</version>
> <scope>provided</scope>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
>
> <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
> <version>${flink.version}</version>
> <scope>provided</scope>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
> <version>${flink.version}</version>
> <scope>provided</scope>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
> <version>${flink.version}</version>
> <scope>provided</scope>
> </dependency>
> <dependency>
> <groupId>org.apache.hive</groupId>
> <artifactId>hive-exec</artifactId>
> <version>2.1.0</version>
> <scope>provided</scope>
> </dependency>
> <dependency>
> <groupId>org.slf4j</groupId>
> <artifactId>slf4j-log4j12</artifactId>
> <version>1.7.7</version>
> </dependency>
> </dependencies>
>
> <build>
> <resources>
> <resource>
> <directory>src/main/resources</directory>
> <filtering>true</filtering>
> </resource>
> </resources>
> <plugins>
> <plugin>
> <groupId>org.apache.maven.plugins</groupId>
> <artifactId>maven-shade-plugin</artifactId>
> <version>3.2.4</version>
> <executions>
> <execution>
> <phase>package</phase>
> <goals>
> <goal>shade</goal>
> </goals>
> <configuration>
> <shadedArtifactAttached>false</shadedArtifactAttached>
> <shadedClassifierName>Shade</shadedClassifierName>
> <createDependencyReducedPom>false</createDependencyReducedPom>
> <filters>
> <filter>
> <artifact>*:*</artifact>
> <excludes>
> <exclude>META-INF/*.SF</exclude>
> <exclude>META-INF/*.DSA</exclude>
> <exclude>META-INF/*.RSA</exclude>
> </excludes>
> </filter>
> </filters>
> </configuration>
> </execution>
> </executions>
> </plugin>
> <plugin>
> <groupId>net.alchim31.maven</groupId>
> <artifactId>scala-maven-plugin</artifactId>
> <version>4.4.1</version>
> <executions>
> <execution>
> <goals>
> <goal>compile</goal>
> <goal>testCompile</goal>
> </goals>
> </execution>
> </executions>
> </plugin>
> </plugins>
> </build>
> </project>
> ```
>
> This is how I package the jar.
>
> ```
> mvn clean package
> ```
>
> This is how I run the job.
>
> ```
> flink run \
> --yarnname scalaflink-hive-test \
> -m yarn-cluster \
> -yarnqueue datadev \
> --class featurepipelines.ingestion.HiveToyExample \
> ./featurepipelines-0.1.1.jar
> ```
>
> ## PyFlink rewrite works just fine?!
>
> Since the logic is so simple, I rewrite the job with PyFlink to see what
> happens. Here shows the PyFlink rewrite.
>
> ```python
> import os
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import *
> from pyflink.table.catalog import HiveCatalog
>
> settings = EnvironmentSettings.new_instance().use_blink_planner().build()
> exec_env = StreamExecutionEnvironment.get_execution_environment()
> t_env = StreamTableEnvironment.create(exec_env,
> environment_settings=settings)
>
> # There exists such a jar in the path
> t_env.get_config().get_configuration().set_string(
>     "pipeline.jars",
> f"file://{os.getcwd()}/deps/flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar"
> )
>
> catalog_name = "myhive"
> default_database = "aiinfra"
> hive_conf_dir = "/data/apache/hive/apache-hive-2.1.0-bin/conf/"
>
> hive_catalog = HiveCatalog(catalog_name, default_database, hive_conf_dir)
> t_env.register_catalog(catalog_name, hive_catalog)
> t_env.use_catalog(catalog_name)
>
> TRANSFORM_DML = """
> INSERT INTO mysink
> SELECT a + b
> FROM mysource
> """
>
> t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)
> t_env.execute_sql(TRANSFORM_DML).wait()
> ```
>
> This is how I run the PyFlink job.
>
> ```
> flink run \
> --yarnname pyflink-hive-test \
> -m yarn-cluster \
> -yD yarn.application.queue=tech_platform \
> -pyarch pyflink1.12.0.zip \
> -pyexec /data/software/pyflink1.12.0/bin/python \
> -py /data/home/pal-flink/chenyisheng14418/feature-pipelines/pyflink/hive.py
> ```
>
> Surprisingly, the job runs fine - it finishes soon, with result written to
> the `mysink` table.
>
> ## Why?
>
> Given the comparison, I highly doubt the Scala job fails because it is not
> packaged correctly, even though I follow [Flink Docs](
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#program-maven),
> which can be verified by looking at my pom.
>
> > If you are building your own program, you need the following
> dependencies in your mvn file. It’s recommended not to include these
> dependencies in the resulting jar file. You’re supposed to add dependencies
> as stated above at runtime.
>
> ```
> <!-- Flink Dependency -->
> <dependency>
>   <groupId>org.apache.flink</groupId>
>   <artifactId>flink-connector-hive_2.11</artifactId>
>   <version>1.12.0</version>
>   <scope>provided</scope>
> </dependency>
>
> <dependency>
>   <groupId>org.apache.flink</groupId>
>   <artifactId>flink-table-api-java-bridge_2.11</artifactId>
>   <version>1.12.0</version>
>   <scope>provided</scope>
> </dependency>
>
> <!-- Hive Dependency -->
> <dependency>
>     <groupId>org.apache.hive</groupId>
>     <artifactId>hive-exec</artifactId>
>     <version>${hive.version}</version>
>     <scope>provided</scope>
> </dependency>
> ```
>
> Also, I have included flink-sql-connector-hive-2.2.0_2.11-1.12.0.jar in
> /lib of my flink distribution, as suggested in [Flink docs](
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/hive/#using-bundled-hive-jar
> ):
>
> > the recommended way to add dependency is to use a bundled jar. Separate
> jars should be used only if bundled jars don’t meet your needs.
>
> What do I miss?
>
> Best,
> Yik San
>
>   [1]: https://i.stack.imgur.com/fBsHS.png
>   [2]: https://i.stack.imgur.com/ilNtr.png
>