You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/27 13:25:48 UTC
[31/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector]
Remove copied Kafka code again. Implemented our own topic metadata retrieval.
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index f38c557..863f7ac 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
* BarrierBuffer continues receiving buffers from the blocked channels and stores them internally until
* the blocks are released.</p>
*/
-public class BarrierBuffer implements CheckpointBarrierHandler, Runnable {
+public class BarrierBuffer implements CheckpointBarrierHandler {
private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
@@ -77,18 +77,6 @@ public class BarrierBuffer implements CheckpointBarrierHandler, Runnable {
/** Flag to indicate whether we have drawn all available input */
private boolean endOfStream;
-
- private int returnedBuffers;
-
- private int spilledBuffers;
-
- private int reReadBuffers;
-
-
- private Thread debugPrinter;
-
- private volatile boolean printerRunning = true;
-
/**
*
* @param inputGate The input gate to draw the buffers and events from.
@@ -103,10 +91,6 @@ public class BarrierBuffer implements CheckpointBarrierHandler, Runnable {
this.bufferSpiller = new BufferSpiller(ioManager, inputGate.getPageSize());
this.queuedBuffered = new ArrayDeque<BufferSpiller.SpilledBufferOrEventSequence>();
-
- this.debugPrinter = new Thread(this, "BB debugger");
- this.debugPrinter.setDaemon(true);
- this.debugPrinter.start();
}
// ------------------------------------------------------------------------
@@ -127,21 +111,14 @@ public class BarrierBuffer implements CheckpointBarrierHandler, Runnable {
completeBufferedSequence();
return getNextNonBlocked();
}
- else if (next.isBuffer()) {
- reReadBuffers++;
- }
}
if (next != null) {
if (isBlocked(next.getChannelIndex())) {
// if the channel is blocked we, we just store the BufferOrEvent
bufferSpiller.add(next);
- if (next.isBuffer()) {
- spilledBuffers++;
- }
}
else if (next.isBuffer()) {
- returnedBuffers++;
return next;
}
else if (next.getEvent().getClass() == CheckpointBarrier.class) {
@@ -245,9 +222,6 @@ public class BarrierBuffer implements CheckpointBarrierHandler, Runnable {
@Override
public void cleanup() throws IOException {
- printerRunning = false;
- debugPrinter.interrupt();
-
bufferSpiller.close();
if (currentBuffered != null) {
currentBuffered.cleanup();
@@ -343,21 +317,4 @@ public class BarrierBuffer implements CheckpointBarrierHandler, Runnable {
return String.format("last checkpoint: %d, current barriers: %d, closed channels: %d",
currentCheckpointId, numBarriersReceived, numClosedChannels);
}
-
- // -------------------------------------
- // TEMP HACK for debugging
-
- public void run() {
- while (printerRunning) {
- try {
- Thread.sleep(5000);
- }
- catch (InterruptedException e) {
- // ignore
- }
-
- LOG.info("=====================> BARRIER BUFFER: returned buffers: {}, spilled buffers: {}, re-read buffers: {}",
- returnedBuffers, spilledBuffers, reReadBuffers);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
index 7ab6722..763885c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/ClusterUtil.java
@@ -27,12 +27,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
public class ClusterUtil {
+
private static final Logger LOG = LoggerFactory.getLogger(ClusterUtil.class);
- private static LocalFlinkMiniCluster exec = null;
-
/**
* Executes the given JobGraph locally, on a FlinkMiniCluster
*
@@ -42,8 +40,6 @@ public class ClusterUtil {
* numberOfTaskTrackers
* @param memorySize
* memorySize
- * @param printDuringExecution
- * @param detached
* @param customConf
* Custom configuration for the LocalExecutor. Can be null.
* @return The result of the job execution, containing elapsed time and accumulators.
@@ -67,7 +63,7 @@ public class ClusterUtil {
try {
exec = new LocalFlinkMiniCluster(configuration, true);
- if(detached) {
+ if (detached) {
exec.submitJobDetached(jobGraph);
return null;
} else {
@@ -84,17 +80,7 @@ public class ClusterUtil {
/**
* Start a job in a detached mode on a local mini cluster.
*/
- public static void startOnMiniCluster(JobGraph jobGraph, int parallelism, long memorySize) throws Exception {
- runOnMiniCluster(jobGraph, parallelism, memorySize, true, true, null);
- }
-
- public static void stopOnMiniCluster() {
- if(exec != null) {
- exec.stop();
- exec = null;
- } else {
- throw new IllegalStateException("Cluster was not started via .start(...)");
- }
+ public static void startOnMiniCluster(JobGraph jobGraph, int parallelism) throws Exception {
+ runOnMiniCluster(jobGraph, parallelism, -1, true, true, null);
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
index 10d4d66..8a75de5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpoinedITCase.java
@@ -31,6 +31,8 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
@@ -55,6 +57,8 @@ import static org.junit.Assert.assertTrue;
@SuppressWarnings("serial")
public class StateCheckpoinedITCase extends StreamFaultToleranceTestBase {
+ private static final Logger LOG = LoggerFactory.getLogger(StateCheckpoinedITCase.class);
+
final long NUM_STRINGS = 10_000_000L;
/**
@@ -72,7 +76,9 @@ public class StateCheckpoinedITCase extends StreamFaultToleranceTestBase {
final long failurePosMax = (long) (0.7 * NUM_STRINGS / PARALLELISM);
final long failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
-
+
+ env.enableCheckpointing(200);
+
DataStream<String> stream = env.addSource(new StringGeneratingSourceFunction(NUM_STRINGS));
stream
@@ -95,8 +101,10 @@ public class StateCheckpoinedITCase extends StreamFaultToleranceTestBase {
@Override
public void postSubmit() {
- if (!OnceFailingAggregator.wasCheckpointedBeforeFailure) {
- System.err.println("Test inconclusive: failure occurred before first checkpoint");
+ //assertTrue("Test inconclusive: failure occurred before first checkpoint",
+ // OnceFailingAggregator.wasCheckpointedBeforeFailure);
+ if(!OnceFailingAggregator.wasCheckpointedBeforeFailure) {
+ LOG.warn("Test inconclusive: failure occurred before first checkpoint");
}
long filterSum = 0;
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/tools/maven/suppressions.xml
----------------------------------------------------------------------
diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml
index d76ed47..2c29054 100644
--- a/tools/maven/suppressions.xml
+++ b/tools/maven/suppressions.xml
@@ -25,5 +25,4 @@ under the License.
<suppressions>
<suppress files="org[\\/]apache[\\/]flink[\\/]api[\\/]io[\\/]avro[\\/]example[\\/]User.java" checks="[a-zA-Z0-9]*"/>
<suppress files="org[\\/]apache[\\/]flink[\\/]api[\\/]io[\\/]avro[\\/]generated[\\/].*.java" checks="[a-zA-Z0-9]*"/>
- <suppress files="org[\\/]apache[\\/]flink[\\/]kafka_backport[\\/].*.java" checks="[a-zA-Z0-9]*"/>
</suppressions>