You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2014/12/18 19:46:15 UTC

[79/82] [abbrv] incubator-flink git commit: Exclude netty dependency from hadoop-mapreduce-client-core to resolve dependency conflict

Exclude netty dependency from hadoop-mapreduce-client-core to resolve dependency conflict


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/b962243b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/b962243b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/b962243b

Branch: refs/heads/master
Commit: b962243b4c190ad266951855ab1439e2a1b096ad
Parents: f5618fa
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Dec 17 15:22:22 2014 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu Dec 18 18:58:33 2014 +0100

----------------------------------------------------------------------
 .../example/HadoopMapredCompatWordCount.java    |  4 +-
 .../mapred/wrapper/HadoopInputSplit.java        |  6 +++
 .../mapreduce/HadoopOutputFormat.java           | 45 +++++++++++++++++++-
 .../mapreduce/example/WordCount.java            |  2 +-
 .../src/test/resources/log4j-test.properties    | 10 ++++-
 .../org/apache/flink/yarn/YarnJobManager.scala  |  2 +-
 .../scala/org/apache/flink/yarn/YarnUtils.scala |  5 +--
 pom.xml                                         | 11 +++++
 8 files changed, 76 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b962243b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
index de20fab..81b1f67 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/HadoopMapredCompatWordCount.java
@@ -92,7 +92,7 @@ public class HadoopMapredCompatWordCount {
 			// normalize and split the line
 			String line = v.toString();
 			String[] tokens = line.toLowerCase().split("\\W+");
-			
+
 			// emit the pairs
 			for (String token : tokens) {
 				if (token.length() > 0) {
@@ -119,8 +119,8 @@ public class HadoopMapredCompatWordCount {
 			while(vs.hasNext()) {
 				cnt += vs.next().get();
 			}
+
 			out.collect(k, new LongWritable(cnt));
-			
 		}
 		
 		@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b962243b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
index 3fb66c2..77c40f5 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java
@@ -92,6 +92,7 @@ public class HadoopInputSplit implements InputSplit {
 	private void writeObject(ObjectOutputStream out) throws IOException {
 		out.writeInt(splitNumber);
 		out.writeUTF(hadoopInputSplitTypeName);
+		jobConf.write(out);
 		hadoopInputSplit.write(out);
 
 	}
@@ -110,6 +111,11 @@ public class HadoopInputSplit implements InputSplit {
 				throw new RuntimeException("Unable to create InputSplit", e);
 			}
 		}
+		jobConf = new JobConf();
+		jobConf.readFields(in);
+		if (this.hadoopInputSplit instanceof Configurable) {
+			((Configurable) this.hadoopInputSplit).setConf(this.jobConf);
+		}
 		this.hadoopInputSplit.readFields(in);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b962243b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
index 402372c..cce7695 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/HadoopOutputFormat.java
@@ -19,6 +19,7 @@
 
 package org.apache.flink.hadoopcompatibility.mapreduce;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
@@ -116,7 +117,9 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable> implement
 		} catch (Exception e) {
 			throw new RuntimeException(e);
 		}
-		
+
+		System.out.println("HadoopOutputFormat: Write to " + this.configuration.get("mapred" +
+				".output.dir"));
 		this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")), context);
 		
 		try {
@@ -133,6 +136,22 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable> implement
 		} catch (InterruptedException e) {
 			throw new IOException("Could not create RecordWriter.", e);
 		}
+
+		File dir = new File(this.configuration.get("mapred.output.dir"));
+		if(dir.exists()){
+			if(dir.isDirectory()){
+				File[] files = dir.listFiles();
+				System.out.println(configuration.get("mapred.output.dir") + " contains the " +
+						"following files.");
+				for(File file: files){
+					System.out.println(file.toPath());
+				}
+			}else{
+				System.out.println(configuration.get("mapred.output.dir") + " is not a directory.");
+			}
+		}else{
+			System.out.println(configuration.get("mapred.output.dir") + " does not exist yet.");
+		}
 	}
 	
 	
@@ -151,6 +170,7 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable> implement
 	 */
 	@Override
 	public void close() throws IOException {
+		System.out.println("HadoopOutputFormat: Close");
 		try {
 			this.recordWriter.close(this.context);
 		} catch (InterruptedException e) {
@@ -162,6 +182,25 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable> implement
 		}
 		
 		Path outputPath = new Path(this.configuration.get("mapred.output.dir"));
+
+		File dir = new File(this.configuration.get("mapred.output.dir"));
+		if(dir.exists()){
+			if(dir.isDirectory()){
+				File[] files = dir.listFiles();
+				System.out.println("Close: " +configuration.get("mapred.output.dir") + " contains" +
+								" the " +
+						"following files.");
+				for(File file: files){
+					System.out.println(file.toPath());
+				}
+			}else{
+				System.out.println("Close: " +configuration.get("mapred.output.dir") + " is not a" +
+						" directory.");
+			}
+		}else{
+			System.out.println("Close: " +configuration.get("mapred.output.dir") + " does not " +
+					"exist yet)).");
+		}
 		
 		// rename tmp-file to final name
 		FileSystem fs = FileSystem.get(outputPath.toUri(), this.configuration);
@@ -171,7 +210,11 @@ public class HadoopOutputFormat<K extends Writable,V extends Writable> implement
 		String tmpFile = tmpFileTemplate.substring(0,11-taskNumberStr.length())+taskNumberStr;
 		
 		if(fs.exists(new Path(outputPath.toString()+"/"+tmpFile))) {
+			System.out.println("Rename file " +  new Path(outputPath.toString()+"/"+tmpFile) + " " +
+					"to " + new Path(outputPath.toString()+"/"+taskNumberStr));
 			fs.rename(new Path(outputPath.toString()+"/"+tmpFile), new Path(outputPath.toString()+"/"+taskNumberStr));
+		}else{
+			System.out.println("File does not exist?");
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b962243b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
index 2b99fd2..271ee6c 100644
--- a/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
+++ b/flink-addons/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapreduce/example/WordCount.java
@@ -95,7 +95,7 @@ public class WordCount {
 			// normalize and split the line
 			String line = value.f1.toString();
 			String[] tokens = line.toLowerCase().split("\\W+");
-			
+
 			// emit the pairs
 			for (String token : tokens) {
 				if (token.length() > 0) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b962243b/flink-addons/flink-hadoop-compatibility/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-addons/flink-hadoop-compatibility/src/test/resources/log4j-test.properties b/flink-addons/flink-hadoop-compatibility/src/test/resources/log4j-test.properties
index 2fb9345..0b686e5 100644
--- a/flink-addons/flink-hadoop-compatibility/src/test/resources/log4j-test.properties
+++ b/flink-addons/flink-hadoop-compatibility/src/test/resources/log4j-test.properties
@@ -16,4 +16,12 @@
 # limitations under the License.
 ################################################################################
 
-log4j.rootLogger=OFF
\ No newline at end of file
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=OFF, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b962243b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index af08f7b..aa5eb13 100644
--- a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -22,7 +22,7 @@ import java.io.{IOException, File}
 import java.nio.ByteBuffer
 import java.util.{ Collections}
 
-import akka.actor.{PoisonPill, ActorRef}
+import akka.actor.{ActorRef}
 import org.apache.flink.configuration.ConfigConstants
 import org.apache.flink.runtime.ActorLogMessages
 import org.apache.flink.runtime.jobmanager.JobManager

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b962243b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala
index 86b06e1..245651d 100644
--- a/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala
+++ b/flink-addons/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnUtils.scala
@@ -42,8 +42,8 @@ object YarnUtils {
   def getConfigString: String = {
     """
     |akka{
-    |  loglevel = "INFO"
-    |  stdout-loglevel = "INFO"
+    |  loglevel = "DEBUG"
+    |  stdout-loglevel = "DEBUG"
     |  log-dead-letters-during-shutdown = off
     |  log-dead-letters = off
     |
@@ -56,7 +56,6 @@ object YarnUtils {
     |
     |    netty{
     |      tcp{
-    |        port = 0
     |        transport-class = "akka.remote.transport.netty.NettyTransport"
     |        tcp-nodelay = on
     |        maximum-frame-size = 1MB

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/b962243b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2a252f8..a166609 100644
--- a/pom.xml
+++ b/pom.xml
@@ -394,6 +394,17 @@ under the License.
 							</exclusion>
 						</exclusions>
 					</dependency>
+					<dependency>
+						<groupId>org.apache.hadoop</groupId>
+						<artifactId>hadoop-mapreduce-client-core</artifactId>
+						<version>${hadoop.version}</version>
+						<exclusions>
+							<exclusion>
+								<groupId>org.jboss.netty</groupId>
+								<artifactId>netty</artifactId>
+							</exclusion>
+						</exclusions>
+					</dependency>
 				</dependencies>
 			</dependencyManagement>
 		</profile>