You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/09/05 10:27:06 UTC

flink git commit: [hotfix] Delete leftover (superseded) StreamTaskAsyncCheckpointTest

Repository: flink
Updated Branches:
  refs/heads/master 6e40f5901 -> 9bbb8fab3


[hotfix] Delete leftover (superseded) StreamTaskAsyncCheckpointTest

There is RocksDBAsyncSnapshotTest which tests async snapshots for the
RocksDB state backend. Operators themselves cannot do asynchronous
checkpoints right now.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9bbb8fab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9bbb8fab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9bbb8fab

Branch: refs/heads/master
Commit: 9bbb8fab38daff8eb5679e4aa7151f68e0b12226
Parents: 6e40f59
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Sep 5 12:25:28 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Sep 5 12:26:38 2016 +0200

----------------------------------------------------------------------
 .../tasks/StreamTaskAsyncCheckpointTest.java    | 234 -------------------
 1 file changed, 234 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9bbb8fab/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
deleted file mode 100644
index 66bc237..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
+++ /dev/null
@@ -1,234 +0,0 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.flink.streaming.runtime.tasks;
-//
-//import org.apache.flink.api.common.ExecutionConfig;
-//import org.apache.flink.api.common.functions.MapFunction;
-//import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-//import org.apache.flink.core.testutils.OneShotLatch;
-//import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-//import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-//import org.apache.flink.streaming.api.graph.StreamConfig;
-//import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-//import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-//import org.apache.flink.streaming.api.watermark.Watermark;
-//import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-//import org.junit.Test;
-//import org.junit.runner.RunWith;
-//import org.powermock.core.classloader.annotations.PowerMockIgnore;
-//import org.powermock.core.classloader.annotations.PrepareForTest;
-//import org.powermock.modules.junit4.PowerMockRunner;
-//
-//import java.io.IOException;
-//import java.lang.reflect.Field;
-//
-//import static org.junit.Assert.assertEquals;
-//import static org.junit.Assert.assertTrue;
-//
-///**
-// * Tests for asynchronous checkpoints.
-// */
-//@RunWith(PowerMockRunner.class)
-//@PrepareForTest(ResultPartitionWriter.class)
-//@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
-//@SuppressWarnings("serial")
-//public class StreamTaskAsyncCheckpointTest {
-//
-//	/**
-//	 * This ensures that asynchronous state handles are actually materialized asynchonously.
-//	 *
-//	 * <p>We use latches to block at various stages and see if the code still continues through
-//	 * the parts that are not asynchronous. If the checkpoint is not done asynchronously the
-//	 * test will simply lock forever.
-//	 * @throws Exception
-//	 */
-//	@Test
-//	public void testAsyncCheckpoints() throws Exception {
-//		final OneShotLatch delayCheckpointLatch = new OneShotLatch();
-//		final OneShotLatch ensureCheckpointLatch = new OneShotLatch();
-//
-//		final OneInputStreamTask<String, String> task = new OneInputStreamTask<>();
-//
-//		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(task, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
-//
-//		StreamConfig streamConfig = testHarness.getStreamConfig();
-//
-//		streamConfig.setStreamOperator(new AsyncCheckpointOperator());
-//
-//		StreamMockEnvironment mockEnv = new StreamMockEnvironment(
-//			testHarness.jobConfig,
-//			testHarness.taskConfig,
-//			testHarness.memorySize,
-//			new MockInputSplitProvider(),
-//			testHarness.bufferSize) {
-//
-//			@Override
-//			public ExecutionConfig getExecutionConfig() {
-//				return testHarness.executionConfig;
-//			}
-//
-//			@Override
-//			public void acknowledgeCheckpoint(long checkpointId) {
-//				super.acknowledgeCheckpoint(checkpointId);
-//			}
-//
-//			@Override
-//			public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {
-//				super.acknowledgeCheckpoint(checkpointId, state);
-//
-//				// block on the latch, to verify that triggerCheckpoint returns below,
-//				// even though the async checkpoint would not finish
-//				try {
-//					delayCheckpointLatch.await();
-//				} catch (InterruptedException e) {
-//					e.printStackTrace();
-//				}
-//
-//				assertTrue(state instanceof StreamTaskStateList);
-//				StreamTaskStateList stateList = (StreamTaskStateList) state;
-//
-//				// should be only one state
-//				StreamTaskState taskState = stateList.getState(this.getUserClassLoader())[0];
-//				StateHandle<?> operatorState = taskState.getOperatorState();
-//				assertTrue("It must be a TestStateHandle", operatorState instanceof TestStateHandle);
-//				TestStateHandle testState = (TestStateHandle) operatorState;
-//				assertEquals(42, testState.checkpointId);
-//				assertEquals(17, testState.timestamp);
-//
-//				// we now know that the checkpoint went through
-//				ensureCheckpointLatch.trigger();
-//			}
-//		};
-//
-//		testHarness.invoke(mockEnv);
-//
-//		// wait for the task to be running
-//		for (Field field: StreamTask.class.getDeclaredFields()) {
-//			if (field.getName().equals("isRunning")) {
-//				field.setAccessible(true);
-//				while (!field.getBoolean(task)) {
-//					Thread.sleep(10);
-//				}
-//
-//			}
-//		}
-//
-//		task.triggerCheckpoint(42, 17);
-//
-//		// now we allow the checkpoint
-//		delayCheckpointLatch.trigger();
-//
-//		// wait for the checkpoint to go through
-//		ensureCheckpointLatch.await();
-//
-//		testHarness.endInput();
-//		testHarness.waitForTaskCompletion();
-//	}
-//
-//
-//	// ------------------------------------------------------------------------
-//
-//	public static class AsyncCheckpointOperator
-//		extends AbstractStreamOperator<String>
-//		implements OneInputStreamOperator<String, String> {
-//		@Override
-//		public void processElement(StreamRecord<String> element) throws Exception {
-//			// we also don't care
-//		}
-//
-//		@Override
-//		public void processWatermark(Watermark mark) throws Exception {
-//			// not interested
-//		}
-//
-//
-//		@Override
-//		public StreamTaskState snapshotOperatorState(final long checkpointId, final long timestamp) throws Exception {
-//			StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
-//
-//			AsynchronousStateHandle<String> asyncState =
-//				new DataInputViewAsynchronousStateHandle(checkpointId, timestamp);
-//
-//			taskState.setOperatorState(asyncState);
-//
-//			return taskState;
-//		}
-//
-//		@Override
-//		public void restoreState(StreamTaskState taskState) throws Exception {
-//			super.restoreState(taskState);
-//		}
-//	}
-//
-//	private static class DataInputViewAsynchronousStateHandle extends AsynchronousStateHandle<String> {
-//
-//		private final long checkpointId;
-//		private final long timestamp;
-//
-//		public DataInputViewAsynchronousStateHandle(long checkpointId, long timestamp) {
-//			this.checkpointId = checkpointId;
-//			this.timestamp = timestamp;
-//		}
-//
-//		@Override
-//		public StateHandle<String> materialize() throws Exception {
-//			return new TestStateHandle(checkpointId, timestamp);
-//		}
-//
-//		@Override
-//		public long getStateSize() {
-//			return 0;
-//		}
-//
-//		@Override
-//		public void close() throws IOException {}
-//	}
-//
-//	private static class TestStateHandle implements StateHandle<String> {
-//
-//		public final long checkpointId;
-//		public final long timestamp;
-//
-//		public TestStateHandle(long checkpointId, long timestamp) {
-//			this.checkpointId = checkpointId;
-//			this.timestamp = timestamp;
-//		}
-//
-//		@Override
-//		public String getState(ClassLoader userCodeClassLoader) throws Exception {
-//			return null;
-//		}
-//
-//		@Override
-//		public void discardState() throws Exception {}
-//
-//		@Override
-//		public long getStateSize() {
-//			return 0;
-//		}
-//
-//		@Override
-//		public void close() throws IOException {}
-//	}
-//
-//	public static class DummyMapFunction<T> implements MapFunction<T, T> {
-//		@Override
-//		public T map(T value) { return value; }
-//	}
-//}