You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/16 18:08:47 UTC
[20/24] flink git commit: [hotfix] [streaming] Remove obsolete
internal state handle classes
[hotfix] [streaming] Remove obsolete internal state handle classes
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d68c8b1d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d68c8b1d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d68c8b1d
Branch: refs/heads/master
Commit: d68c8b1d59493923bc980ccadee30afcab1dfd35
Parents: 7c20543
Author: Stephan Ewen <se...@apache.org>
Authored: Sat Oct 10 03:15:15 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 16 15:26:11 2015 +0200
----------------------------------------------------------------------
.../runtime/state/ByteStreamStateHandle.java | 100 ---------------
.../flink/runtime/state/FileStateHandle.java | 70 -----------
.../runtime/state/PartitionedStateHandle.java | 53 --------
.../state/ByteStreamStateHandleTest.java | 126 -------------------
4 files changed, 349 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d68c8b1d/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
deleted file mode 100644
index 7ecfe62..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ByteStreamStateHandle.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.util.InstantiationUtil;
-
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-
-/**
- * Statehandle that writes/reads the contents of the serializable checkpointed
- * state to the provided input and outputstreams using default java
- * serialization.
- *
- */
-public abstract class ByteStreamStateHandle implements StateHandle<Serializable> {
-
- private static final long serialVersionUID = -962025800339325828L;
-
- private transient Serializable state;
- private boolean isWritten = false;
-
- public ByteStreamStateHandle(Serializable state) {
- if (state != null) {
- this.state = state;
- } else {
- throw new RuntimeException("State cannot be null");
- }
- }
-
- /**
- * The state will be written to the stream returned by this method.
- */
- protected abstract OutputStream getOutputStream() throws Exception;
-
- /**
- * The state will be read from the stream returned by this method.
- */
- protected abstract InputStream getInputStream() throws Exception;
-
- @Override
- public Serializable getState(ClassLoader userCodeClassLoader) throws Exception {
- if (!stateFetched()) {
- ObjectInputStream stream = new InstantiationUtil.ClassLoaderObjectInputStream(getInputStream(), userCodeClassLoader);
- try {
- state = (Serializable) stream.readObject();
- } finally {
- stream.close();
- }
- }
- return state;
- }
-
- private void writeObject(ObjectOutputStream oos) throws Exception {
- if (!isWritten) {
- ObjectOutputStream stream = new ObjectOutputStream(getOutputStream());
- try {
- stream.writeObject(state);
- isWritten = true;
- } finally {
- stream.close();
- }
- }
- oos.defaultWriteObject();
- }
-
- /**
- * Checks whether the state has already been fetched from the remote
- * storage.
- */
- public boolean stateFetched() {
- return state != null;
- }
-
- /**
- * Checks whether the state has already been written to the external store
- */
- public boolean isWritten() {
- return isWritten;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d68c8b1d/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
deleted file mode 100644
index c45990b..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/FileStateHandle.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.net.URI;
-import java.net.URISyntaxException;
-
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.StringUtils;
-
-import java.util.Random;
-
-/**
- * Statehandle that writes the checkpointed state to a random file in the
- * provided checkpoint directory. Any Flink supported File system can be used
- * but it is advised to use a filesystem that is persistent in case of node
- * failures, such as HDFS or Tachyon.
- *
- */
-public class FileStateHandle extends ByteStreamStateHandle {
-
- private static final long serialVersionUID = 1L;
-
- private String pathString;
-
- public FileStateHandle(Serializable state, String folder) {
- super(state);
- this.pathString = folder + "/" + randomString();
- }
-
- protected OutputStream getOutputStream() throws IOException, URISyntaxException {
- return FileSystem.get(new URI(pathString)).create(new Path(pathString), true);
- }
-
- protected InputStream getInputStream() throws IOException, URISyntaxException {
- return FileSystem.get(new URI(pathString)).open(new Path(pathString));
- }
-
- private String randomString() {
- final byte[] bytes = new byte[20];
- new Random().nextBytes(bytes);
- return StringUtils.byteToHexString(bytes);
- }
-
- @Override
- public void discardState() throws Exception {
- FileSystem.get(new URI(pathString)).delete(new Path(pathString), false);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d68c8b1d/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionedStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionedStateHandle.java
deleted file mode 100644
index 9ec748b..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionedStateHandle.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import java.io.Serializable;
-import java.util.Map;
-
-/**
- * Wrapper for storing the handles for each state in a partitioned form. It can
- * be used to repartition the state before re-injecting to the tasks.
- *
- * TODO: This class needs testing!
- */
-public class PartitionedStateHandle implements
- StateHandle<Map<Serializable, StateHandle<Serializable>>> {
-
- private static final long serialVersionUID = 7505365403501402100L;
-
- Map<Serializable, StateHandle<Serializable>> handles;
-
- public PartitionedStateHandle(Map<Serializable, StateHandle<Serializable>> handles) {
- this.handles = handles;
- }
-
- @Override
- public Map<Serializable, StateHandle<Serializable>> getState(ClassLoader userCodeClassLoader) throws Exception {
- return handles;
- }
-
- @Override
- public void discardState() throws Exception {
- for (StateHandle<Serializable> handle : handles.values()) {
- handle.discardState();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d68c8b1d/flink-runtime/src/test/java/org/apache/flink/runtime/state/ByteStreamStateHandleTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ByteStreamStateHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ByteStreamStateHandleTest.java
deleted file mode 100644
index c667139..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ByteStreamStateHandleTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-
-import org.apache.flink.util.InstantiationUtil;
-import org.junit.Test;
-
-public class ByteStreamStateHandleTest {
-
- @Test
- public void testHandle() throws Exception {
- final ClassLoader cl = this.getClass().getClassLoader();
- MockHandle handle;
-
- try {
- handle = new MockHandle(null);
- fail();
- } catch (RuntimeException e) {
- // expected behaviour
- }
-
- handle = new MockHandle(1);
-
- assertEquals(1, handle.getState(cl));
- assertTrue(handle.stateFetched());
- assertFalse(handle.isWritten());
- assertFalse(handle.discarded);
-
- MockHandle handleDs = serializeDeserialize(handle);
-
- assertEquals(1, handle.getState(cl));
- assertTrue(handle.stateFetched());
- assertTrue(handle.isWritten());
- assertTrue(handle.generatedOutput);
- assertFalse(handle.discarded);
-
- assertFalse(handleDs.stateFetched());
- assertTrue(handleDs.isWritten());
- assertFalse(handleDs.generatedOutput);
- assertFalse(handle.discarded);
-
- try {
- handleDs.getState(cl);
- fail();
- } catch (UnsupportedOperationException e) {
- // good
- }
-
- MockHandle handleDs2 = serializeDeserialize(handleDs);
-
- assertFalse(handleDs2.stateFetched());
- assertTrue(handleDs2.isWritten());
- assertFalse(handleDs.generatedOutput);
- assertFalse(handleDs2.generatedOutput);
- assertFalse(handleDs2.discarded);
-
- handleDs2.discardState();
- assertTrue(handleDs2.discarded);
-
- }
-
- @SuppressWarnings("unchecked")
- private <X extends StateHandle<?>> X serializeDeserialize(X handle) throws IOException,
- ClassNotFoundException {
- byte[] serialized = InstantiationUtil.serializeObject(handle);
- return (X) InstantiationUtil.deserializeObject(serialized, Thread.currentThread()
- .getContextClassLoader());
- }
-
- private static class MockHandle extends ByteStreamStateHandle {
-
- private static final long serialVersionUID = 1L;
-
- public MockHandle(Serializable state) {
- super(state);
- }
-
- boolean discarded = false;
- transient boolean generatedOutput = false;
-
- @Override
- public void discardState() throws Exception {
- discarded = true;
- }
-
- @Override
- protected OutputStream getOutputStream() throws Exception {
- generatedOutput = true;
- return new ByteArrayOutputStream();
- }
-
- @Override
- protected InputStream getInputStream() throws Exception {
- throw new UnsupportedOperationException();
- }
-
- }
-
-}