You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/22 22:12:09 UTC
[2/9] flink git commit: [hotfix] Remove no longer used Generic State
classes
[hotfix] Remove no longer used Generic State classes
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/254a7007
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/254a7007
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/254a7007
Branch: refs/heads/master
Commit: 254a70072aa3bbd51277e99e0982c5e29908d684
Parents: 51c02ee
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jan 13 14:52:40 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jan 22 21:22:21 2017 +0100
----------------------------------------------------------------------
.../runtime/state/GenericFoldingState.java | 80 -------------------
.../flink/runtime/state/GenericListState.java | 84 --------------------
.../runtime/state/GenericReducingState.java | 83 -------------------
3 files changed, 247 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/254a7007/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java
deleted file mode 100644
index ee2d86d..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.state.FoldingState;
-import org.apache.flink.api.common.state.ValueState;
-
-/**
- * Generic implementation of {@link FoldingState} based on a wrapped {@link ValueState}.
- *
- * @param <N> The type of the namespace.
- * @param <T> The type of the values that can be folded into the state.
- * @param <ACC> The type of the value in the folding state.
- * @param <W> Generic type that extends both the underlying {@code ValueState} and {@code KvState}.
- */
-public class GenericFoldingState<N, T, ACC, W extends ValueState<ACC> & KvState<N>>
- implements FoldingState<T, ACC>, KvState<N> {
-
- private final W wrappedState;
- private final FoldFunction<T, ACC> foldFunction;
-
- /**
- * Creates a new {@code FoldingState} that wraps the given {@link ValueState}. The
- * {@code ValueState} must have the initial value of the fold as default value.
- *
- * @param wrappedState The wrapped {@code ValueState}
- * @param foldFunction The {@code FoldFunction} to use for folding values into the state
- */
- @SuppressWarnings("unchecked")
- public GenericFoldingState(ValueState<ACC> wrappedState, FoldFunction<T, ACC> foldFunction) {
- if (!(wrappedState instanceof KvState)) {
- throw new IllegalArgumentException("Wrapped state must be a KvState.");
- }
- this.wrappedState = (W) wrappedState;
- this.foldFunction = foldFunction;
- }
-
- @Override
- public void setCurrentNamespace(N namespace) {
- wrappedState.setCurrentNamespace(namespace);
- }
-
- @Override
- public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception {
- return wrappedState.getSerializedValue(serializedKeyAndNamespace);
- }
-
- @Override
- public ACC get() throws Exception {
- return wrappedState.value();
- }
-
- @Override
- public void add(T value) throws Exception {
- ACC currentValue = wrappedState.value();
- wrappedState.update(foldFunction.fold(currentValue, value));
- }
-
- @Override
- public void clear() {
- wrappedState.clear();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/254a7007/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
deleted file mode 100644
index ba81837..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ValueState;
-
-import java.util.ArrayList;
-
-/**
- * Generic implementation of {@link ListState} based on a wrapped {@link ValueState}.
- *
- * @param <N> The type of the namespace.
- * @param <T> The type of the values stored in this {@code ListState}.
- * @param <W> Generic type that extends both the underlying {@code ValueState} and {@code KvState}.
- */
-public class GenericListState<N, T, W extends ValueState<ArrayList<T>> & KvState<N>>
- implements ListState<T>, KvState<N> {
-
- private final W wrappedState;
-
- /**
- * Creates a new {@code ListState} that wraps the given {@link ValueState}. The
- * {@code ValueState} must have a default value of {@code null}.
- *
- * @param wrappedState The wrapped {@code ValueState}
- */
- @SuppressWarnings("unchecked")
- public GenericListState(ValueState<ArrayList<T>> wrappedState) {
- if (!(wrappedState instanceof KvState)) {
- throw new IllegalArgumentException("Wrapped state must be a KvState.");
- }
- this.wrappedState = (W) wrappedState;
- }
-
- @Override
- public void setCurrentNamespace(N namespace) {
- wrappedState.setCurrentNamespace(namespace);
- }
-
- @Override
- public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception {
- return wrappedState.getSerializedValue(serializedKeyAndNamespace);
- }
-
- @Override
- public Iterable<T> get() throws Exception {
- return wrappedState.value();
- }
-
- @Override
- public void add(T value) throws Exception {
- ArrayList<T> currentValue = wrappedState.value();
- if (currentValue == null) {
- currentValue = new ArrayList<>();
- currentValue.add(value);
- wrappedState.update(currentValue);
- } else {
- currentValue.add(value);
- wrappedState.update(currentValue);
- }
- }
-
- @Override
- public void clear() {
- wrappedState.clear();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/254a7007/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java
deleted file mode 100644
index 214231e..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ValueState;
-
-/**
- * Generic implementation of {@link ReducingState} based on a wrapped {@link ValueState}.
- *
- * @param <N> The type of the namespace.
- * @param <T> The type of the values stored in this {@code ReducingState}.
- * @param <W> Generic type that extends both the underlying {@code ValueState} and {@code KvState}.
- */
-public class GenericReducingState<N, T, W extends ValueState<T> & KvState<N>>
- implements ReducingState<T>, KvState<N> {
-
- private final W wrappedState;
- private final ReduceFunction<T> reduceFunction;
-
- /**
- * Creates a new {@code ReducingState} that wraps the given {@link ValueState}. The
- * {@code ValueState} must have a default value of {@code null}.
- *
- * @param wrappedState The wrapped {@code ValueState}
- * @param reduceFunction The {@code ReduceFunction} to use for combining values.
- */
- @SuppressWarnings("unchecked")
- public GenericReducingState(ValueState<T> wrappedState, ReduceFunction<T> reduceFunction) {
- if (!(wrappedState instanceof KvState)) {
- throw new IllegalArgumentException("Wrapped state must be a KvState.");
- }
- this.wrappedState = (W) wrappedState;
- this.reduceFunction = reduceFunction;
- }
-
- @Override
- public void setCurrentNamespace(N namespace) {
- wrappedState.setCurrentNamespace(namespace);
- }
-
- @Override
- public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception {
- return wrappedState.getSerializedValue(serializedKeyAndNamespace);
- }
-
- @Override
- public T get() throws Exception {
- return wrappedState.value();
- }
-
- @Override
- public void add(T value) throws Exception {
- T currentValue = wrappedState.value();
- if (currentValue == null) {
- wrappedState.update(value);
- } else {
- wrappedState.update(reduceFunction.reduce(currentValue, value));
- }
- }
-
- @Override
- public void clear() {
- wrappedState.clear();
- }
-}