You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2014/12/18 19:46:15 UTC
[79/82] [abbrv] incubator-flink git commit: Exclude netty dependency
from hadoop-mapreduce-client-core to resolve dependency conflict
Exclude netty dependency from hadoop-mapreduce-client-core to resolve dependency conflict
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/b962243b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/b962243b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/b962243b
Branch: refs/heads/master
Commit: b962243b4c190ad266951855ab1439e2a1b096ad
Parents: f5618fa
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Dec 17 15:22:22 2014 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Dec 18 18:58:33 2014 +0100
----------------------------------------------------------------------
.../example/HadoopMapredCompatWordCount.java | 4 +-
.../mapred/wrapper/HadoopInputSplit.java | 6 +++
.../mapreduce/HadoopOutputFormat.java | 45 +++++++++++++++++++-
.../mapreduce/example/WordCount.java | 2 +-
.../src/test/resources/log4j-test.properties | 10 ++++-
.../org/apache/flink/yarn/YarnJobManager.scala | 2 +-
.../scala/org/apache/flink/yarn/YarnUtils.scala | 5 +--
pom.xml | 11 +++++
8 files changed, 76 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b962243b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
index de20fab..81b1f67 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
@@ -92,7 +92,7 @@ public class HadoopMapredCompatWordCount {
// normalize and split the line
String line = v.toString();
String[] tokens = line.toLowerCase().split("\\W+");
-
+
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
@@ -119,8 +119,8 @@ public class HadoopMapredCompatWordCount {
while(vs.hasNext()) {
cnt += vs.next().get();
}
+
out.collect(k, new LongWritable(cnt));
-
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b962243b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
index 3fb66c2..77c40f5 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
@@ -92,6 +92,7 @@ public class HadoopInputSplit implements InputSplit {
private void writeObject(ObjectOutputStream out) throws IOException {
out.writeInt(splitNumber);
out.writeUTF(hadoopInputSplitTypeName);
+ jobConf.write(out);
hadoopInputSplit.write(out);
}
@@ -110,6 +111,11 @@ public class HadoopInputSplit implements InputSplit {
throw new RuntimeException("Unable to create InputSplit", e);
}
}
+ jobConf = new JobConf();
+ jobConf.readFields(in);
+ if (this.hadoopInputSplit instanceof Configurable) {
+ ((Configurable) this.hadoopInputSplit).setConf(this.jobConf);
+ }
this.hadoopInputSplit.readFields(in);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b962243b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
index 402372c..cce7695 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
@@ -19,6 +19,7 @@
package org.apache.flink.hadoopcompatibility.mapreduce;
+import java.io.File;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
@@ -116,7 +117,9 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable> implement
} catch (Exception e) {
throw new RuntimeException(e);
}
-
+
+ System.out.println("HadoopOutputFormat: Write to " + this.configuration.get("mapred" +
+ ".output.dir"));
this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")), context);
try {
@@ -133,6 +136,22 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable> implement
} catch (InterruptedException e) {
throw new IOException("Could not create RecordWriter.", e);
}
+
+ File dir = new File(this.configuration.get("mapred.output.dir"));
+ if(dir.exists()){
+ if(dir.isDirectory()){
+ File[] files = dir.listFiles();
+ System.out.println(configuration.get("mapred.output.dir") + " contains the " +
+ "following files.");
+ for(File file: files){
+ System.out.println(file.toPath());
+ }
+ }else{
+ System.out.println(configuration.get("mapred.output.dir") + " is not a directory.");
+ }
+ }else{
+ System.out.println(configuration.get("mapred.output.dir") + " does not exist yet.");
+ }
}
@@ -151,6 +170,7 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable> implement
*/
@Override
public void close() throws IOException {
+ System.out.println("HadoopOutputFormat: Close");
try {
this.recordWriter.close(this.context);
} catch (InterruptedException e) {
@@ -162,6 +182,25 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable> implement
}
Path outputPath = new Path(this.configuration.get("mapred.output.dir"));
+
+ File dir = new File(this.configuration.get("mapred.output.dir"));
+ if(dir.exists()){
+ if(dir.isDirectory()){
+ File[] files = dir.listFiles();
+ System.out.println("Close: " +configuration.get("mapred.output.dir") + " contains" +
+ " the " +
+ "following files.");
+ for(File file: files){
+ System.out.println(file.toPath());
+ }
+ }else{
+ System.out.println("Close: " +configuration.get("mapred.output.dir") + " is not a" +
+ " directory.");
+ }
+ }else{
+ System.out.println("Close: " +configuration.get("mapred.output.dir") + " does not " +
+ "exist yet)).");
+ }
// rename tmp-file to final name
FileSystem fs = FileSystem.get(outputPath.toUri(), this.configuration);
@@ -171,7 +210,11 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable> implement
String tmpFile = tmpFileTemplate.substring(0,11-taskNumberStr.length())+taskNumberStr;
if(fs.exists(new Path(outputPath.toString()+"/"+tmpFile))) {
+ System.out.println("Rename file " + new Path(outputPath.toString()+"/"+tmpFile) + " " +
+ "to " + new Path(outputPath.toString()+"/"+taskNumberStr));
fs.rename(new Path(outputPath.toString()+"/"+tmpFile), new Path(outputPath.toString()+"/"+taskNumberStr));
+ }else{
+ System.out.println("File does not exist?");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b962243b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
index 2b99fd2..271ee6c 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
@@ -95,7 +95,7 @@ public class WordCount {
// normalize and split the line
String line = value.f1.toString();
String[] tokens = line.toLowerCase().split("\\W+");
-
+
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b962243b/flink-addons/flink-hadoop-compatibility/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/resources/log4j-test.properties b/flink-addons/flink-hadoop-compatibility/src/test/resources/log4j-test.properties
index 2fb9345..0b686e5 100644
--- a/flink-addons/flink-hadoop-compatibility/src/test/resources/log4j-test.properties
+++ b/flink-addons/flink-hadoop-compatibility/src/test/resources/log4j-test.properties
@@ -16,4 +16,12 @@
# limitations under the License.
################################################################################
-log4j.rootLogger=OFF
\ No newline at end of file
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=OFF, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b962243b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index af08f7b..aa5eb13 100644
--- a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -22,7 +22,7 @@ import java.io.{IOException, File}
import java.nio.ByteBuffer
import java.util.{ Collections}
-import akka.actor.{PoisonPill, ActorRef}
+import akka.actor.{ActorRef}
import org.apache.flink.configuration.ConfigConstants
import org.apache.flink.runtime.ActorLogMessages
import org.apache.flink.runtime.jobmanager.JobManager
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b962243b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala
index 86b06e1..245651d 100644
--- a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala
+++ b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala
@@ -42,8 +42,8 @@ object YarnUtils {
def getConfigString: String = {
"""
|akka{
- | loglevel = "INFO"
- | stdout-loglevel = "INFO"
+ | loglevel = "DEBUG"
+ | stdout-loglevel = "DEBUG"
| log-dead-letters-during-shutdown = off
| log-dead-letters = off
|
@@ -56,7 +56,6 @@ object YarnUtils {
|
| netty{
| tcp{
- | port = 0
| transport-class = "akka.remote.transport.netty.NettyTransport"
| tcp-nodelay = on
| maximum-frame-size = 1MB
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b962243b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2a252f8..a166609 100644
--- a/pom.xml
+++ b/pom.xml
@@ -394,6 +394,17 @@ under the License.
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
</dependencyManagement>
</profile>