You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 13:25:48 UTC

[31/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector] Remove copied Kafka code again. Implemented our own topic metadata retrieval.

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index f38c557..863f7ac 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
  * BarrierBuffer continues receiving buffers from the blocked channels and stores them internally until 
  * the blocks are released.</p>
  */
-public class BarrierBuffer implements CheckpointBarrierHandler, Runnable {
+public class BarrierBuffer implements CheckpointBarrierHandler {
 
 	private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
 	
@@ -77,18 +77,6 @@ public class BarrierBuffer implements CheckpointBarrierHandler, Runnable {
 	/** Flag to indicate whether we have drawn all available input */
 	private boolean endOfStream;
 
-
-	private int returnedBuffers;
-	
-	private int spilledBuffers;
-	
-	private int reReadBuffers;
-	
-	
-	private Thread debugPrinter;
-	
-	private volatile boolean printerRunning = true;
-	
 	/**
 	 * 
 	 * @param inputGate The input gate to draw the buffers and events from.
@@ -103,10 +91,6 @@ public class BarrierBuffer implements CheckpointBarrierHandler, Runnable {
 		
 		this.bufferSpiller = new BufferSpiller(ioManager, inputGate.getPageSize());
 		this.queuedBuffered = new ArrayDeque<BufferSpiller.SpilledBufferOrEventSequence>();
-		
-		this.debugPrinter = new Thread(this, "BB debugger");
-		this.debugPrinter.setDaemon(true);
-		this.debugPrinter.start();
 	}
 
 	// ------------------------------------------------------------------------
@@ -127,21 +111,14 @@ public class BarrierBuffer implements CheckpointBarrierHandler, Runnable {
 					completeBufferedSequence();
 					return getNextNonBlocked();
 				}
-				else if (next.isBuffer()) {
-					reReadBuffers++;
-				}
 			}
 			
 			if (next != null) {
 				if (isBlocked(next.getChannelIndex())) {
 					// if the channel is blocked we, we just store the BufferOrEvent
 					bufferSpiller.add(next);
-					if (next.isBuffer()) {
-						spilledBuffers++;
-					}
 				}
 				else if (next.isBuffer()) {
-					returnedBuffers++;
 					return next;
 				}
 				else if (next.getEvent().getClass() == CheckpointBarrier.class) {
@@ -245,9 +222,6 @@ public class BarrierBuffer implements CheckpointBarrierHandler, Runnable {
 
 	@Override
 	public void cleanup() throws IOException {
-		printerRunning = false;
-		debugPrinter.interrupt();
-		
 		bufferSpiller.close();
 		if (currentBuffered != null) {
 			currentBuffered.cleanup();
@@ -343,21 +317,4 @@ public class BarrierBuffer implements CheckpointBarrierHandler, Runnable {
 		return String.format("last checkpoint: %d, current barriers: %d, closed channels: %d",
 				currentCheckpointId, numBarriersReceived, numClosedChannels);
 	}
-	
-	// -------------------------------------
-	// TEMP HACK for debugging
-	
-	public void run() {
-		while (printerRunning) {
-			try {
-				Thread.sleep(5000);
-			}
-			catch (InterruptedException e) {
-				// ignore
-			}
-			
-			LOG.info("=====================> BARRIER BUFFER: returned buffers: {}, spilled buffers: {}, re-read buffers: {}",
-					returnedBuffers, spilledBuffers, reReadBuffers);
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
index 7ab6722..763885c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
@@ -27,12 +27,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-
 public class ClusterUtil {
+	
 	private static final Logger LOG = LoggerFactory.getLogger(ClusterUtil.class);
 
-	private static LocalFlinkMiniCluster exec = null;
-
 	/**
 	 * Executes the given JobGraph locally, on a FlinkMiniCluster
 	 * 
@@ -42,8 +40,6 @@ public class ClusterUtil {
 	 *            numberOfTaskTrackers
 	 * @param memorySize
 	 *            memorySize
-	 * @param printDuringExecution
-	 * @param detached
 	 * @param customConf
 	 * 		Custom configuration for the LocalExecutor. Can be null.
 	 * @return The result of the job execution, containing elapsed time and accumulators.
@@ -67,7 +63,7 @@ public class ClusterUtil {
 
 		try {
 			exec = new LocalFlinkMiniCluster(configuration, true);
-			if(detached) {
+			if (detached) {
 				exec.submitJobDetached(jobGraph);
 				return null;
 			} else {
@@ -84,17 +80,7 @@ public class ClusterUtil {
 	/**
 	 * Start a job in a detached mode on a local mini cluster.
 	 */
-	public static void startOnMiniCluster(JobGraph jobGraph, int parallelism, long memorySize) throws Exception {
-		runOnMiniCluster(jobGraph, parallelism, memorySize, true, true, null);
-	}
-
-	public static void stopOnMiniCluster() {
-		if(exec != null) {
-			exec.stop();
-			exec = null;
-		} else {
-			throw new IllegalStateException("Cluster was not started via .start(...)");
-		}
+	public static void startOnMiniCluster(JobGraph jobGraph, int parallelism) throws Exception {
+		runOnMiniCluster(jobGraph, parallelism, -1, true, true, null);
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
index 10d4d66..8a75de5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
@@ -31,6 +31,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 import java.io.IOException;
@@ -55,6 +57,8 @@ import static org.junit.Assert.assertTrue;
 @SuppressWarnings("serial")
 public class StateCheckpoinedITCase extends StreamFaultToleranceTestBase {
 
+	private static final Logger LOG = LoggerFactory.getLogger(StateCheckpoinedITCase.class);
+
 	final long NUM_STRINGS = 10_000_000L;
 
 	/**
@@ -72,7 +76,9 @@ public class StateCheckpoinedITCase extends StreamFaultToleranceTestBase {
 		final long failurePosMax = (long) (0.7 * NUM_STRINGS / PARALLELISM);
 
 		final long failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
-		
+
+		env.enableCheckpointing(200);
+
 		DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
 
 		stream
@@ -95,8 +101,10 @@ public class StateCheckpoinedITCase extends StreamFaultToleranceTestBase {
 	@Override
 	public void postSubmit() {
 		
-		if (!OnceFailingAggregator.wasCheckpointedBeforeFailure) {
-			System.err.println("Test inconclusive: failure occurred before first checkpoint");
+		//assertTrue("Test inconclusive: failure occurred before first checkpoint",
+		//		OnceFailingAggregator.wasCheckpointedBeforeFailure);
+		if(!OnceFailingAggregator.wasCheckpointedBeforeFailure) {
+			LOG.warn("Test inconclusive: failure occurred before first checkpoint");
 		}
 		
 		long filterSum = 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/tools/maven/suppressions.xml
----------------------------------------------------------------------
diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml
index d76ed47..2c29054 100644
--- a/tools/maven/suppressions.xml
+++ b/tools/maven/suppressions.xml
@@ -25,5 +25,4 @@ under the License.
 <suppressions>
 		<suppress files="org[\\/]apache[\\/]flink[\\/]api[\\/]io[\\/]avro[\\/]example[\\/]User.java" checks="[a-zA-Z0-9]*"/>
 		<suppress files="org[\\/]apache[\\/]flink[\\/]api[\\/]io[\\/]avro[\\/]generated[\\/].*.java" checks="[a-zA-Z0-9]*"/>
-		<suppress files="org[\\/]apache[\\/]flink[\\/]kafka_backport[\\/].*.java" checks="[a-zA-Z0-9]*"/>
 </suppressions>