You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/16 18:08:47 UTC

[20/24] flink git commit: [hotfix] [streaming] Remove obsolete internal state handle classes

[hotfix] [streaming] Remove obsolete internal state handle classes


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d68c8b1d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d68c8b1d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d68c8b1d

Branch: refs/heads/master
Commit: d68c8b1d59493923bc980ccadee30afcab1dfd35
Parents: 7c20543
Author: Stephan Ewen <se...@apache.org>
Authored: Sat Oct 10 03:15:15 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 16 15:26:11 2015 +0200

----------------------------------------------------------------------
 .../runtime/state/ByteStreamStateHandle.java    | 100 ---------------
 .../flink/runtime/state/FileStateHandle.java    |  70 -----------
 .../runtime/state/PartitionedStateHandle.java   |  53 --------
 .../state/ByteStreamStateHandleTest.java        | 126 -------------------
 4 files changed, 349 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d68c8b1d/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
deleted file mode 100644
index 7ecfe62..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.util.InstantiationUtil;
-
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-
-/**
- * Statehandle that writes/reads the contents of the serializable checkpointed
- * state to the provided input and outputstreams using default java
- * serialization.
- * 
- */
-public abstract class ByteStreamStateHandle implements StateHandle<Serializable> {
-
-	private static final long serialVersionUID = -962025800339325828L;
-
-	private transient Serializable state;
-	private boolean isWritten = false;
-
-	public ByteStreamStateHandle(Serializable state) {
-		if (state != null) {
-			this.state = state;
-		} else {
-			throw new RuntimeException("State cannot be null");
-		}
-	}
-
-	/**
-	 * The state will be written to the stream returned by this method.
-	 */
-	protected abstract OutputStream getOutputStream() throws Exception;
-
-	/**
-	 * The state will be read from the stream returned by this method.
-	 */
-	protected abstract InputStream getInputStream() throws Exception;
-
-	@Override
-	public Serializable getState(ClassLoader userCodeClassLoader) throws Exception {
-		if (!stateFetched()) {
-			ObjectInputStream stream = new InstantiationUtil.ClassLoaderObjectInputStream(getInputStream(), userCodeClassLoader);
-			try {
-				state = (Serializable) stream.readObject();
-			} finally {
-				stream.close();
-			}
-		}
-		return state;
-	}
-
-	private void writeObject(ObjectOutputStream oos) throws Exception {
-		if (!isWritten) {
-			ObjectOutputStream stream = new ObjectOutputStream(getOutputStream());
-			try {
-				stream.writeObject(state);
-				isWritten = true;
-			} finally {
-				stream.close();
-			}
-		}
-		oos.defaultWriteObject();
-	}
-
-	/**
-	 * Checks whether the state has already been fetched from the remote
-	 * storage.
-	 */
-	public boolean stateFetched() {
-		return state != null;
-	}
-	
-	/**
-	 * Checks whether the state has already been written to the external store
-	 */
-	public boolean isWritten() {
-		return isWritten;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d68c8b1d/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
deleted file mode 100644
index c45990b..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.net.URI;
-import java.net.URISyntaxException;
-
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.StringUtils;
-
-import java.util.Random;
-
-/**
- * Statehandle that writes the checkpointed state to a random file in the
- * provided checkpoint directory. Any Flink supported File system can be used
- * but it is advised to use a filesystem that is persistent in case of node
- * failures, such as HDFS or Tachyon.
- * 
- */
-public class FileStateHandle extends ByteStreamStateHandle {
-
-	private static final long serialVersionUID = 1L;
-
-	private String pathString;
-
-	public FileStateHandle(Serializable state, String folder) {
-		super(state);
-		this.pathString = folder + "/" + randomString();
-	}
-
-	protected OutputStream getOutputStream() throws IOException, URISyntaxException {
-		return FileSystem.get(new URI(pathString)).create(new Path(pathString), true);
-	}
-
-	protected InputStream getInputStream() throws IOException, URISyntaxException {
-		return FileSystem.get(new URI(pathString)).open(new Path(pathString));
-	}
-
-	private String randomString() {
-		final byte[] bytes = new byte[20];
-		new Random().nextBytes(bytes);
-		return StringUtils.byteToHexString(bytes);
-	}
-
-	@Override
-	public void discardState() throws Exception {
-		FileSystem.get(new URI(pathString)).delete(new Path(pathString), false);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d68c8b1d/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionedStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionedStateHandle.java
deleted file mode 100644
index 9ec748b..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionedStateHandle.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import java.io.Serializable;
-import java.util.Map;
-
-/**
- * Wrapper for storing the handles for each state in a partitioned form. It can
- * be used to repartition the state before re-injecting to the tasks.
- *
- * TODO: This class needs testing!
- */
-public class PartitionedStateHandle implements
-		StateHandle<Map<Serializable, StateHandle<Serializable>>> {
-
-	private static final long serialVersionUID = 7505365403501402100L;
-
-	Map<Serializable, StateHandle<Serializable>> handles;
-
-	public PartitionedStateHandle(Map<Serializable, StateHandle<Serializable>> handles) {
-		this.handles = handles;
-	}
-
-	@Override
-	public Map<Serializable, StateHandle<Serializable>> getState(ClassLoader userCodeClassLoader) throws Exception {
-		return handles;
-	}
-
-	@Override
-	public void discardState() throws Exception {
-		for (StateHandle<Serializable> handle : handles.values()) {
-			handle.discardState();
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d68c8b1d/flink-runtime/src/test/java/org/apache/flink/runtime/state/ByteStreamStateHandleTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ByteStreamStateHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ByteStreamStateHandleTest.java
deleted file mode 100644
index c667139..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ByteStreamStateHandleTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-
-import org.apache.flink.util.InstantiationUtil;
-import org.junit.Test;
-
-public class ByteStreamStateHandleTest {
-
-	@Test
-	public void testHandle() throws Exception {
-		final ClassLoader cl = this.getClass().getClassLoader();
-		MockHandle handle;
-
-		try {
-			handle = new MockHandle(null);
-			fail();
-		} catch (RuntimeException e) {
-			// expected behaviour
-		}
-
-		handle = new MockHandle(1);
-
-		assertEquals(1, handle.getState(cl));
-		assertTrue(handle.stateFetched());
-		assertFalse(handle.isWritten());
-		assertFalse(handle.discarded);
-
-		MockHandle handleDs = serializeDeserialize(handle);
-
-		assertEquals(1, handle.getState(cl));
-		assertTrue(handle.stateFetched());
-		assertTrue(handle.isWritten());
-		assertTrue(handle.generatedOutput);
-		assertFalse(handle.discarded);
-
-		assertFalse(handleDs.stateFetched());
-		assertTrue(handleDs.isWritten());
-		assertFalse(handleDs.generatedOutput);
-		assertFalse(handle.discarded);
-
-		try {
-			handleDs.getState(cl);
-			fail();
-		} catch (UnsupportedOperationException e) {
-			// good
-		}
-
-		MockHandle handleDs2 = serializeDeserialize(handleDs);
-
-		assertFalse(handleDs2.stateFetched());
-		assertTrue(handleDs2.isWritten());
-		assertFalse(handleDs.generatedOutput);
-		assertFalse(handleDs2.generatedOutput);
-		assertFalse(handleDs2.discarded);
-
-		handleDs2.discardState();
-		assertTrue(handleDs2.discarded);
-
-	}
-
-	@SuppressWarnings("unchecked")
-	private <X extends StateHandle<?>> X serializeDeserialize(X handle) throws IOException,
-			ClassNotFoundException {
-		byte[] serialized = InstantiationUtil.serializeObject(handle);
-		return (X) InstantiationUtil.deserializeObject(serialized, Thread.currentThread()
-				.getContextClassLoader());
-	}
-
-	private static class MockHandle extends ByteStreamStateHandle {
-
-		private static final long serialVersionUID = 1L;
-
-		public MockHandle(Serializable state) {
-			super(state);
-		}
-
-		boolean discarded = false;
-		transient boolean generatedOutput = false;
-
-		@Override
-		public void discardState() throws Exception {
-			discarded = true;
-		}
-
-		@Override
-		protected OutputStream getOutputStream() throws Exception {
-			generatedOutput = true;
-			return new ByteArrayOutputStream();
-		}
-
-		@Override
-		protected InputStream getInputStream() throws Exception {
-			throw new UnsupportedOperationException();
-		}
-
-	}
-
-}