You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by greghogan <gi...@git.apache.org> on 2016/04/16 16:36:10 UTC

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

GitHub user greghogan opened a pull request:

    https://github.com/apache/flink/pull/1900

    [FLINK-3771] [gelly] Methods for translating Graphs

    Methods for translation of the type or value of graph labels, vertex values, and edge values.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/greghogan/flink 3771_methods_for_translating_graphs

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1900.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1900
    
----
commit 0f9d0ed4d7ca8bcaa4651339cacf49ed1eec3667
Author: Greg Hogan <co...@greghogan.com>
Date:   2016-04-16T11:06:44Z

    [FLINK-3771] [gelly] Methods for translating Graphs
    
    Methods for translation of the type or value of graph labels, vertex
    values, and edge values.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1900


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1900#discussion_r61552437
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java ---
    @@ -0,0 +1,346 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.asm.translate;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
    +import org.apache.flink.api.java.operators.translation.WrappingFunction;
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo;
    +import org.apache.flink.api.java.typeutils.TypeExtractor;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.util.Preconditions;
    +
    +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
    +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN;
    +
    +/**
    + * Methods for translation of the type or modification of the data of graph
    + * labels, vertex values, and edge values.
    + */
    +public class Translate {
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Translate vertex labels
    +	// --------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Translate {@link Vertex} labels using the given {@link MapFunction}.
    +	 *
    +	 * @param vertices input vertices
    +	 * @param translator implements conversion from {@code OLD} to {@code NEW}
    +	 * @param <OLD> old vertex label type
    +	 * @param <NEW> new vertex label type
    +	 * @param <VV> vertex value type
    +	 * @return translated vertices
    +	 */
    +	public static <OLD,NEW,VV> DataSet<Vertex<NEW,VV>> translateVertexLabels(DataSet<Vertex<OLD,VV>> vertices, MapFunction<OLD,NEW> translator) {
    +		return translateVertexLabels(vertices, translator, PARALLELISM_UNKNOWN);
    +	}
    +
    +	/**
    +	 * Translate {@link Vertex} labels using the given {@link MapFunction}.
    +	 *
    +	 * @param vertices input vertices
    +	 * @param translator implements conversion from {@code OLD} to {@code NEW}
    +	 * @param parallelism operator parallelism
    +	 * @param <OLD> old vertex label type
    +	 * @param <NEW> new vertex label type
    +	 * @param <VV> vertex value type
    +	 * @return translated vertices
    +	 */
    +	@SuppressWarnings("unchecked")
    +	public static <OLD,NEW,VV> DataSet<Vertex<NEW,VV>> translateVertexLabels(DataSet<Vertex<OLD,VV>> vertices, MapFunction<OLD,NEW> translator, int parallelism) {
    +		Preconditions.checkNotNull(vertices);
    +		Preconditions.checkNotNull(translator);
    +		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN,
    +			"The parallelism must be greater than zero.");
    +
    +		Class<Vertex<NEW,VV>> vertexClass = (Class<Vertex<NEW,VV>>)(Class<? extends Vertex>) Vertex.class;
    +		TypeInformation<NEW> newType = TypeExtractor.createTypeInfo(MapFunction.class, translator.getClass(), 1, null, null);
    +		TypeInformation<VV> vertexValueType = ((TupleTypeInfo<Vertex<OLD,VV>>) vertices.getType()).getTypeAt(1);
    +
    +		TupleTypeInfo<Vertex<NEW,VV>> returnType = new TupleTypeInfo<>(vertexClass, newType, vertexValueType);
    +
    +		return vertices
    +			.map(new TranslateVertexLabel<OLD,NEW,VV>(translator))
    +			.returns(returnType)
    +				.setParallelism(parallelism)
    +				.name("Translate vertex labels");
    +	}
    +
    +	/**
    +	 * Translate {@link Vertex} labels using the given {@link MapFunction}.
    +	 *
    +	 * @param <OLD> old vertex label type
    +	 * @param <NEW> new vertex label type
    +	 * @param <VV> vertex value type
    +	 */
    +	@ForwardedFields("1")
    +	private static class TranslateVertexLabel<OLD,NEW,VV>
    +	extends WrappingFunction<MapFunction<OLD,NEW>>
    +	implements MapFunction<Vertex<OLD,VV>, Vertex<NEW,VV>> {
    +		private Vertex<NEW,VV> vertex = new Vertex<>();
    +
    +		public TranslateVertexLabel(MapFunction<OLD,NEW> translator) {
    +			super(translator);
    +		}
    +
    +		@Override
    +		public Vertex<NEW,VV> map(Vertex<OLD,VV> value)
    +				throws Exception {
    --- End diff --
    
    can be moved to the line above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1900#discussion_r61083553
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/translate/Translate.java ---
    @@ -0,0 +1,362 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.translate;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.Vertex;
    +
    +/**
    + * Methods for translation of the type or modification of the data of graph
    + * labels, vertex values, and edge values.
    + */
    +public class Translate {
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Translate graph labels
    +	// --------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Relabels {@link Vertex Vertices} and {@link Edge}s of a {@link Graph} using the given {@link Translator}.
    +	 *
    +	 * @param graph input graph
    +	 * @param translator implements conversion from {@code OLD} to {@code NEW}
    +	 * @param <OLD> old graph label type
    +	 * @param <NEW> new graph label type
    +	 * @param <VV> vertex value type
    +	 * @param <EV> edge value type
    +	 * @return translated graph
    +	 */
    +	public static <OLD,NEW,VV,EV> Graph<NEW,VV,EV> translateGraphLabels(Graph<OLD,VV,EV> graph, Translator<OLD,NEW> translator) {
    +		return translateGraphLabels(graph, translator, ExecutionConfig.PARALLELISM_UNKNOWN);
    +	}
    +
    +	/**
    +	 * Relabels {@link Vertex Vertices} and {@link Edge}s of a {@link Graph} using the given {@link Translator}.
    +	 *
    +	 * @param graph input graph
    +	 * @param translator implements conversion from {@code OLD} to {@code NEW}
    +	 * @param parallelism operator parallelism
    +	 * @param <OLD> old graph label type
    +	 * @param <NEW> new graph label type
    +	 * @param <VV> vertex value type
    +	 * @param <EV> edge value type
    +	 * @return translated graph
    +	 */
    +	public static <OLD,NEW,VV,EV> Graph<NEW,VV,EV> translateGraphLabels(Graph<OLD,VV,EV> graph, Translator<OLD,NEW> translator, int parallelism) {
    +		// Vertices
    +		DataSet<Vertex<NEW,VV>> translatedVertices = translateVertexLabels(graph.getVertices(), translator, parallelism);
    +
    +		// Edges
    +		DataSet<Edge<NEW,EV>> translatedEdges = translateEdgeLabels(graph.getEdges(), translator, parallelism);
    +
    +		// Graph
    +		return Graph.fromDataSet(translatedVertices, translatedEdges, graph.getContext());
    +	}
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Translate vertex labels
    +	// --------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Translate {@link Vertex} labels using the given {@link Translator}.
    +	 *
    +	 * @param vertices input vertices
    +	 * @param translator implements conversion from {@code OLD} to {@code NEW}
    +	 * @param <OLD> old vertex label type
    +	 * @param <NEW> new vertex label type
    +	 * @param <VV> vertex value type
    +	 * @return translated vertices
    +	 */
    +	public static <OLD,NEW,VV> DataSet<Vertex<NEW,VV>> translateVertexLabels(DataSet<Vertex<OLD,VV>> vertices, Translator<OLD,NEW> translator) {
    +		return translateVertexLabels(vertices, translator, ExecutionConfig.PARALLELISM_UNKNOWN);
    +	}
    +
    +	/**
    +	 * Translate {@link Vertex} labels using the given {@link Translator}.
    +	 *
    +	 * @param vertices input vertices
    +	 * @param translator implements conversion from {@code OLD} to {@code NEW}
    +	 * @param parallelism operator parallelism
    +	 * @param <OLD> old vertex label type
    +	 * @param <NEW> new vertex label type
    +	 * @param <VV> vertex value type
    +	 * @return translated vertices
    +	 */
    +	@SuppressWarnings("unchecked")
    +	public static <OLD,NEW,VV> DataSet<Vertex<NEW,VV>> translateVertexLabels(DataSet<Vertex<OLD,VV>> vertices, Translator<OLD,NEW> translator, int parallelism) {
    +		Class<Vertex<NEW,VV>> vertexClass = (Class<Vertex<NEW,VV>>)(Class<? extends Vertex>) Vertex.class;
    +		TypeInformation<NEW> newType = translator.getTypeHint().getTypeInfo();
    +		TypeInformation<VV> vertexValueType = ((TupleTypeInfo<Vertex<OLD,VV>>) vertices.getType()).getTypeAt(1);
    +
    +		TupleTypeInfo<Vertex<NEW,VV>> returnType = new TupleTypeInfo<>(vertexClass, newType, vertexValueType);
    +
    +		return vertices
    +			.map(new TranslateVertexLabel<OLD,NEW,VV>(translator))
    +			.returns(returnType)
    +				.setParallelism(parallelism)
    +				.name("Translate vertex labels");
    +	}
    +
    +	/**
    +	 * Translate {@link Vertex} labels using the given {@link Translator}.
    +	 *
    +	 * @param <OLD> old vertex label type
    +	 * @param <NEW> new vertex label type
    +	 * @param <VV> vertex value type
    +	 */
    +	@ForwardedFields("1")
    +	private static class TranslateVertexLabel<OLD,NEW,VV>
    +	implements MapFunction<Vertex<OLD,VV>, Vertex<NEW,VV>> {
    +		private final Translator<OLD,NEW> translator;
    +
    +		private Vertex<NEW,VV> vertex = new Vertex<>();
    +
    +		public TranslateVertexLabel(Translator<OLD,NEW> translator) {
    +			this.translator = translator;
    +		}
    +
    +		@Override
    +		public Vertex<NEW,VV> map(Vertex<OLD,VV> value)
    +				throws Exception {
    +			vertex.f0 = translator.translate(value.f0, vertex.f0);
    +			vertex.f1 = value.f1;
    +
    +			return vertex;
    +		}
    +	}
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Translate edge labels
    +	// --------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Translate {@link Edge} labels using the given {@link Translator}.
    +	 *
    +	 * @param edges input edges
    +	 * @param translator implements conversion from {@code OLD} to {@code NEW}
    +	 * @param <OLD> old edge label type
    +	 * @param <NEW> new edge label type
    +	 * @param <EV> edge value type
    +	 * @return translated edges
    +	 */
    +	public static <OLD,NEW,EV> DataSet<Edge<NEW,EV>> translateEdgeLabels(DataSet<Edge<OLD,EV>> edges, Translator<OLD,NEW> translator) {
    +		return translateEdgeLabels(edges, translator, ExecutionConfig.PARALLELISM_UNKNOWN);
    +	}
    +
    +	/**
    +	 * Translate {@link Edge} labels using the given {@link Translator}.
    +	 *
    +	 * @param edges input edges
    +	 * @param translator implements conversion from {@code OLD} to {@code NEW}
    +	 * @param parallelism operator parallelism
    +	 * @param <OLD> old edge label type
    +	 * @param <NEW> new edge label type
    +	 * @param <EV> edge value type
    +	 * @return translated edges
    +	 */
    +	@SuppressWarnings("unchecked")
    +	public static <OLD,NEW,EV> DataSet<Edge<NEW,EV>> translateEdgeLabels(DataSet<Edge<OLD,EV>> edges, Translator<OLD,NEW> translator, int parallelism) {
    +		Class<Edge<NEW,EV>> edgeClass = (Class<Edge<NEW,EV>>)(Class<? extends Edge>) Edge.class;
    +		TypeInformation<NEW> newType = translator.getTypeHint().getTypeInfo();
    +		TypeInformation<EV> edgeValueType = ((TupleTypeInfo<Edge<OLD,EV>>) edges.getType()).getTypeAt(2);
    +
    +		TupleTypeInfo<Edge<NEW,EV>> returnType = new TupleTypeInfo<>(edgeClass, newType, newType, edgeValueType);
    +
    +		return edges
    +			.map(new TranslateEdgeLabel<OLD,NEW,EV>(translator))
    +			.returns(returnType)
    +				.setParallelism(parallelism)
    +				.name("Translate edge labels");
    +	}
    +
    +	/**
    +	 * Translate {@link Edge} labels using the given {@link Translator}.
    +	 *
    +	 * @param <OLD> old edge label type
    +	 * @param <NEW> new edge label type
    +	 * @param <EV> edge label type
    +	 */
    +	@ForwardedFields("2")
    +	private static class TranslateEdgeLabel<OLD,NEW,EV>
    +	implements MapFunction<Edge<OLD,EV>, Edge<NEW,EV>> {
    +		private final Translator<OLD,NEW> translator;
    +
    +		private Edge<NEW,EV> edge = new Edge<>();
    +
    +		public TranslateEdgeLabel(Translator<OLD,NEW> translator) {
    +			this.translator = translator;
    +		}
    +
    +		@Override
    +		public Edge<NEW,EV> map(Edge<OLD,EV> value)
    +				throws Exception {
    +			edge.f0 = translator.translate(value.f0, edge.f0);
    +			edge.f1 = translator.translate(value.f1, edge.f1);
    +			edge.f2 = value.f2;
    +
    +			return edge;
    +		}
    +	}
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Translate vertex values
    +	// --------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Translate {@link Vertex} values using the given {@link Translator}.
    +	 *
    +	 * @param vertices input vertices
    +	 * @param <K> vertex label type
    +	 * @param <OLD> old vertex value type
    +	 * @param <NEW> new vertex value type
    +	 * @return translated vertices
    +	 */
    +	public static <K,OLD,NEW> DataSet<Vertex<K,NEW>> translateVertexValues(DataSet<Vertex<K,OLD>> vertices, Translator<OLD,NEW> translator) {
    +		return translateVertexValues(vertices, translator, ExecutionConfig.PARALLELISM_UNKNOWN);
    +	}
    +
    +	/**
    +	 * Translate {@link Vertex} values using the given {@link Translator}.
    +	 *
    +	 * @param vertices source vertices
    +	 * @param parallelism operator parallelism
    +	 * @param <K> vertex label type
    +	 * @param <OLD> old vertex value type
    +	 * @param <NEW> new vertex value type
    +	 * @return translated vertices
    +	 */
    +	@SuppressWarnings("unchecked")
    +	public static <K,OLD,NEW> DataSet<Vertex<K,NEW>> translateVertexValues(DataSet<Vertex<K,OLD>> vertices, Translator<OLD,NEW> translator, int parallelism) {
    +		Class<Vertex<K,NEW>> vertexClass = (Class<Vertex<K,NEW>>)(Class<? extends Vertex>) Vertex.class;
    +		TypeInformation<K> labelType = ((TupleTypeInfo<Vertex<K,OLD>>) vertices.getType()).getTypeAt(0);
    +		TypeInformation<NEW> newType = translator.getTypeHint().getTypeInfo();
    +
    +		TupleTypeInfo<Vertex<K,NEW>> returnType = new TupleTypeInfo<>(vertexClass, labelType, newType);
    +
    +		return vertices
    +			.map(new TranslateVertexValue<K,OLD,NEW>(translator))
    +			.returns(returnType)
    +				.setParallelism(parallelism)
    +				.name("Translate vertex values");
    +	}
    +
    +	/**
    +	 * Translate {@link Vertex} values using the given {@link Translator}.
    +	 *
    +	 * @param <K> vertex label type
    +	 * @param <OLD> old vertex value type
    +	 * @param <NEW> new vertex value type
    +	 */
    +	@ForwardedFields("0")
    +	private static class TranslateVertexValue<K,OLD,NEW>
    +	implements MapFunction<Vertex<K,OLD>, Vertex<K,NEW>> {
    +		private final Translator<OLD,NEW> translator;
    +
    +		private Vertex<K,NEW> vertex = new Vertex<>();
    +
    +		public TranslateVertexValue(Translator<OLD,NEW> translator) {
    +			this.translator = translator;
    +		}
    +
    +		@Override
    +		public Vertex<K,NEW> map(Vertex<K,OLD> value)
    +				throws Exception {
    +			vertex.f0 = value.f0;
    +			vertex.f1 = translator.translate(value.f1, vertex.f1);;
    --- End diff --
    
    Noting the double semi-colons here and below.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on the pull request:

    https://github.com/apache/flink/pull/1900#issuecomment-214825734
  
    By reuse I was referring to needing `MapVertexValueLongToString` and `MapEdgeValueLongToString` since the current `Graph` methods operate at the level of `Vertex` and `Edge`. And then to translate labels you need `MapVertexLabelLongToString` and `MapEdgeLabelLongToString`. In this PR we simply have a single `LongToString` which can be used to translate vertex labels, edge labels, vertex values, and edge values.
    
    I agree that `Translate` methods taking a `Graph` as input would be better as methods on `Graph`.
    
    And if we don't implement as `MapFunction` then we still need `Translate` to handle the `TypeInformation`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1900#discussion_r61551561
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java ---
    @@ -0,0 +1,346 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.asm.translate;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
    +import org.apache.flink.api.java.operators.translation.WrappingFunction;
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo;
    +import org.apache.flink.api.java.typeutils.TypeExtractor;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.util.Preconditions;
    +
    +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
    +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN;
    +
    +/**
    + * Methods for translation of the type or modification of the data of graph
    + * labels, vertex values, and edge values.
    + */
    +public class Translate {
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Translate vertex labels
    +	// --------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Translate {@link Vertex} labels using the given {@link MapFunction}.
    +	 *
    +	 * @param vertices input vertices
    +	 * @param translator implements conversion from {@code OLD} to {@code NEW}
    +	 * @param <OLD> old vertex label type
    +	 * @param <NEW> new vertex label type
    +	 * @param <VV> vertex value type
    +	 * @return translated vertices
    +	 */
    +	public static <OLD,NEW,VV> DataSet<Vertex<NEW,VV>> translateVertexLabels(DataSet<Vertex<OLD,VV>> vertices, MapFunction<OLD,NEW> translator) {
    +		return translateVertexLabels(vertices, translator, PARALLELISM_UNKNOWN);
    +	}
    +
    +	/**
    +	 * Translate {@link Vertex} labels using the given {@link MapFunction}.
    +	 *
    +	 * @param vertices input vertices
    +	 * @param translator implements conversion from {@code OLD} to {@code NEW}
    +	 * @param parallelism operator parallelism
    +	 * @param <OLD> old vertex label type
    +	 * @param <NEW> new vertex label type
    +	 * @param <VV> vertex value type
    +	 * @return translated vertices
    +	 */
    +	@SuppressWarnings("unchecked")
    +	public static <OLD,NEW,VV> DataSet<Vertex<NEW,VV>> translateVertexLabels(DataSet<Vertex<OLD,VV>> vertices, MapFunction<OLD,NEW> translator, int parallelism) {
    --- End diff --
    
    spaces


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1900#discussion_r61551105
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueAddOffset.java ---
    @@ -0,0 +1,50 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.asm.translate;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.types.LongValue;
    +
    +/**
    + * Translate {@link LongValue} by adding a constant offset value.
    + */
    +public class LongValueAddOffset
    +implements MapFunction<LongValue, LongValue> {
    --- End diff --
    
    can go in the line above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1900#discussion_r61551910
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java ---
    @@ -0,0 +1,346 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.asm.translate;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
    +import org.apache.flink.api.java.operators.translation.WrappingFunction;
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo;
    +import org.apache.flink.api.java.typeutils.TypeExtractor;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.util.Preconditions;
    +
    +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
    +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN;
    +
    +/**
    + * Methods for translation of the type or modification of the data of graph
    + * labels, vertex values, and edge values.
    + */
    +public class Translate {
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Translate vertex labels
    +	// --------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Translate {@link Vertex} labels using the given {@link MapFunction}.
    +	 *
    +	 * @param vertices input vertices
    +	 * @param translator implements conversion from {@code OLD} to {@code NEW}
    +	 * @param <OLD> old vertex label type
    +	 * @param <NEW> new vertex label type
    +	 * @param <VV> vertex value type
    +	 * @return translated vertices
    +	 */
    +	public static <OLD,NEW,VV> DataSet<Vertex<NEW,VV>> translateVertexLabels(DataSet<Vertex<OLD,VV>> vertices, MapFunction<OLD,NEW> translator) {
    +		return translateVertexLabels(vertices, translator, PARALLELISM_UNKNOWN);
    +	}
    +
    +	/**
    +	 * Translate {@link Vertex} labels using the given {@link MapFunction}.
    +	 *
    +	 * @param vertices input vertices
    +	 * @param translator implements conversion from {@code OLD} to {@code NEW}
    +	 * @param parallelism operator parallelism
    +	 * @param <OLD> old vertex label type
    +	 * @param <NEW> new vertex label type
    +	 * @param <VV> vertex value type
    +	 * @return translated vertices
    +	 */
    +	@SuppressWarnings("unchecked")
    +	public static <OLD,NEW,VV> DataSet<Vertex<NEW,VV>> translateVertexLabels(DataSet<Vertex<OLD,VV>> vertices, MapFunction<OLD,NEW> translator, int parallelism) {
    +		Preconditions.checkNotNull(vertices);
    +		Preconditions.checkNotNull(translator);
    +		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN,
    +			"The parallelism must be greater than zero.");
    +
    +		Class<Vertex<NEW,VV>> vertexClass = (Class<Vertex<NEW,VV>>)(Class<? extends Vertex>) Vertex.class;
    +		TypeInformation<NEW> newType = TypeExtractor.createTypeInfo(MapFunction.class, translator.getClass(), 1, null, null);
    +		TypeInformation<VV> vertexValueType = ((TupleTypeInfo<Vertex<OLD,VV>>) vertices.getType()).getTypeAt(1);
    +
    +		TupleTypeInfo<Vertex<NEW,VV>> returnType = new TupleTypeInfo<>(vertexClass, newType, vertexValueType);
    +
    +		return vertices
    +			.map(new TranslateVertexLabel<OLD,NEW,VV>(translator))
    +			.returns(returnType)
    +				.setParallelism(parallelism)
    +				.name("Translate vertex labels");
    +	}
    +
    +	/**
    +	 * Translate {@link Vertex} labels using the given {@link MapFunction}.
    +	 *
    +	 * @param <OLD> old vertex label type
    +	 * @param <NEW> new vertex label type
    +	 * @param <VV> vertex value type
    +	 */
    +	@ForwardedFields("1")
    +	private static class TranslateVertexLabel<OLD,NEW,VV>
    +	extends WrappingFunction<MapFunction<OLD,NEW>>
    --- End diff --
    
    formatting is off and missing spaces


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1900#discussion_r61551292
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueAddOffset.java ---
    @@ -0,0 +1,50 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.asm.translate;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.types.LongValue;
    +
    +/**
    + * Translate {@link LongValue} by adding a constant offset value.
    + */
    +public class LongValueAddOffset
    +implements MapFunction<LongValue, LongValue> {
    +
    +	private final long offset;
    +
    +	private LongValue output = new LongValue();
    +
    +	/**
    +	 * Translate {@link LongValue} by adding a constant offset value.
    +	 *
    +	 * @param offset value to be added to each element
    +	 */
    +	public LongValueAddOffset(long offset) {
    +		this.offset = offset;
    +	}
    +
    +	@Override
    +	public LongValue map(LongValue value)
    +			throws Exception {
    +		output.setValue(offset + value.getValue());
    +
    --- End diff --
    
    empty line can be removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1900#discussion_r61582214
  
    --- Diff: flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala ---
    @@ -407,6 +407,36 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
       }
     
       /**
    +   * Relabels vertices and edges using the given MapFunction.
    +   *
    +   * @param translator implements conversion from K to NEW
    +   * @return relabeled graph
    +   */
    +  def translateGraphLabels[NEW: TypeInformation : ClassTag](translator: MapFunction[K, NEW]): Graph[NEW,VV,EV] = {
    --- End diff --
    
    Fixed in next push.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1900#issuecomment-214733082
  
    Hi @greghogan,
    I looked at the PR in more detail and I have some thoughts.
    First, why do we need a `Translate` class and `Translator` interface? Can't we simply add methods in the `Graph` class that wrap a `MapFunction` or receive it as an argument like in `mapVertices`?
    Second, translating values is already supported by `mapVertices` and `mapEdges`. Couldn't we simply extend these methods to also support changing the vertex key types?
    Let me know what you think! Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1900#discussion_r61551647
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java ---
    @@ -0,0 +1,346 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.asm.translate;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
    +import org.apache.flink.api.java.operators.translation.WrappingFunction;
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo;
    +import org.apache.flink.api.java.typeutils.TypeExtractor;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.util.Preconditions;
    +
    +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
    +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN;
    +
    +/**
    + * Methods for translation of the type or modification of the data of graph
    + * labels, vertex values, and edge values.
    + */
    +public class Translate {
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Translate vertex labels
    +	// --------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Translate {@link Vertex} labels using the given {@link MapFunction}.
    +	 *
    +	 * @param vertices input vertices
    +	 * @param translator implements conversion from {@code OLD} to {@code NEW}
    +	 * @param <OLD> old vertex label type
    +	 * @param <NEW> new vertex label type
    +	 * @param <VV> vertex value type
    +	 * @return translated vertices
    +	 */
    +	public static <OLD,NEW,VV> DataSet<Vertex<NEW,VV>> translateVertexLabels(DataSet<Vertex<OLD,VV>> vertices, MapFunction<OLD,NEW> translator) {
    +		return translateVertexLabels(vertices, translator, PARALLELISM_UNKNOWN);
    +	}
    +
    +	/**
    +	 * Translate {@link Vertex} labels using the given {@link MapFunction}.
    +	 *
    +	 * @param vertices input vertices
    +	 * @param translator implements conversion from {@code OLD} to {@code NEW}
    +	 * @param parallelism operator parallelism
    +	 * @param <OLD> old vertex label type
    +	 * @param <NEW> new vertex label type
    +	 * @param <VV> vertex value type
    +	 * @return translated vertices
    +	 */
    +	@SuppressWarnings("unchecked")
    +	public static <OLD,NEW,VV> DataSet<Vertex<NEW,VV>> translateVertexLabels(DataSet<Vertex<OLD,VV>> vertices, MapFunction<OLD,NEW> translator, int parallelism) {
    +		Preconditions.checkNotNull(vertices);
    +		Preconditions.checkNotNull(translator);
    +		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN,
    +			"The parallelism must be greater than zero.");
    +
    +		Class<Vertex<NEW,VV>> vertexClass = (Class<Vertex<NEW,VV>>)(Class<? extends Vertex>) Vertex.class;
    +		TypeInformation<NEW> newType = TypeExtractor.createTypeInfo(MapFunction.class, translator.getClass(), 1, null, null);
    +		TypeInformation<VV> vertexValueType = ((TupleTypeInfo<Vertex<OLD,VV>>) vertices.getType()).getTypeAt(1);
    +
    +		TupleTypeInfo<Vertex<NEW,VV>> returnType = new TupleTypeInfo<>(vertexClass, newType, vertexValueType);
    +
    +		return vertices
    +			.map(new TranslateVertexLabel<OLD,NEW,VV>(translator))
    +			.returns(returnType)
    +				.setParallelism(parallelism)
    --- End diff --
    
    indention is off


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on the pull request:

    https://github.com/apache/flink/pull/1900#issuecomment-215444763
  
    In Java 8 this is {{graph.run(new TranslateEdgeValues<>(new LongValueToStringValue()))}}, see "Target Types" from https://docs.oracle.com/javase/tutorial/java/generics/genTypeInference.html.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1900#discussion_r61552592
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java ---
    @@ -0,0 +1,81 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.asm.translate;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.util.Preconditions;
    +
    +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
    +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN;
    +import static org.apache.flink.graph.asm.translate.Translate.translateEdgeValues;
    +
    +/**
    + * Translate {@link Edge} values using the given {@link MapFunction}.
    + *
    + * @param <K> vertex label type
    + * @param <VV> vertex value type
    + * @param <OLD> old edge value type
    + * @param <NEW> new edge value type
    + */
    +public class TranslateEdgeValues<K, VV, OLD, NEW>
    +implements GraphAlgorithm<K, VV, OLD, Graph<K,VV,NEW>> {
    --- End diff --
    
    can be moved to the previous line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1900#discussion_r61552416
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java ---
    @@ -0,0 +1,346 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.asm.translate;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
    +import org.apache.flink.api.java.operators.translation.WrappingFunction;
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo;
    +import org.apache.flink.api.java.typeutils.TypeExtractor;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.util.Preconditions;
    +
    +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
    +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN;
    +
    +/**
    + * Methods for translation of the type or modification of the data of graph
    + * labels, vertex values, and edge values.
    + */
    +public class Translate {
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Translate vertex labels
    +	// --------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Translate {@link Vertex} labels using the given {@link MapFunction}.
    +	 *
    +	 * @param vertices input vertices
    +	 * @param translator implements conversion from {@code OLD} to {@code NEW}
    +	 * @param <OLD> old vertex label type
    +	 * @param <NEW> new vertex label type
    +	 * @param <VV> vertex value type
    +	 * @return translated vertices
    +	 */
    +	public static <OLD,NEW,VV> DataSet<Vertex<NEW,VV>> translateVertexLabels(DataSet<Vertex<OLD,VV>> vertices, MapFunction<OLD,NEW> translator) {
    +		return translateVertexLabels(vertices, translator, PARALLELISM_UNKNOWN);
    +	}
    +
    +	/**
    +	 * Translate {@link Vertex} labels using the given {@link MapFunction}.
    +	 *
    +	 * @param vertices input vertices
    +	 * @param translator implements conversion from {@code OLD} to {@code NEW}
    +	 * @param parallelism operator parallelism
    +	 * @param <OLD> old vertex label type
    +	 * @param <NEW> new vertex label type
    +	 * @param <VV> vertex value type
    +	 * @return translated vertices
    +	 */
    +	@SuppressWarnings("unchecked")
    +	public static <OLD,NEW,VV> DataSet<Vertex<NEW,VV>> translateVertexLabels(DataSet<Vertex<OLD,VV>> vertices, MapFunction<OLD,NEW> translator, int parallelism) {
    +		Preconditions.checkNotNull(vertices);
    +		Preconditions.checkNotNull(translator);
    +		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN,
    +			"The parallelism must be greater than zero.");
    +
    +		Class<Vertex<NEW,VV>> vertexClass = (Class<Vertex<NEW,VV>>)(Class<? extends Vertex>) Vertex.class;
    +		TypeInformation<NEW> newType = TypeExtractor.createTypeInfo(MapFunction.class, translator.getClass(), 1, null, null);
    +		TypeInformation<VV> vertexValueType = ((TupleTypeInfo<Vertex<OLD,VV>>) vertices.getType()).getTypeAt(1);
    +
    +		TupleTypeInfo<Vertex<NEW,VV>> returnType = new TupleTypeInfo<>(vertexClass, newType, vertexValueType);
    +
    +		return vertices
    +			.map(new TranslateVertexLabel<OLD,NEW,VV>(translator))
    +			.returns(returnType)
    +				.setParallelism(parallelism)
    +				.name("Translate vertex labels");
    +	}
    +
    +	/**
    +	 * Translate {@link Vertex} labels using the given {@link MapFunction}.
    +	 *
    +	 * @param <OLD> old vertex label type
    +	 * @param <NEW> new vertex label type
    +	 * @param <VV> vertex value type
    +	 */
    +	@ForwardedFields("1")
    +	private static class TranslateVertexLabel<OLD,NEW,VV>
    +	extends WrappingFunction<MapFunction<OLD,NEW>>
    +	implements MapFunction<Vertex<OLD,VV>, Vertex<NEW,VV>> {
    +		private Vertex<NEW,VV> vertex = new Vertex<>();
    +
    +		public TranslateVertexLabel(MapFunction<OLD,NEW> translator) {
    --- End diff --
    
    make the constructor private?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1900#discussion_r61552700
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java ---
    @@ -0,0 +1,81 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.asm.translate;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.util.Preconditions;
    +
    +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
    +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN;
    +import static org.apache.flink.graph.asm.translate.Translate.translateEdgeValues;
    +
    +/**
    + * Translate {@link Edge} values using the given {@link MapFunction}.
    + *
    + * @param <K> vertex label type
    + * @param <VV> vertex value type
    + * @param <OLD> old edge value type
    + * @param <NEW> new edge value type
    + */
    +public class TranslateEdgeValues<K, VV, OLD, NEW>
    +implements GraphAlgorithm<K, VV, OLD, Graph<K,VV,NEW>> {
    +
    +	// Required configuration
    +	private MapFunction<OLD,NEW> translator;
    +
    +	// Optional configuration
    +	private int parallelism = PARALLELISM_UNKNOWN;
    +
    +	/**
    +	 * Translate {@link Edge} values using the given {@link MapFunction}.
    +	 *
    +	 * @param translator implements conversion from {@code OLD} to {@code NEW}
    +	 */
    +	public TranslateEdgeValues(MapFunction<OLD,NEW> translator) {
    +		Preconditions.checkNotNull(translator);
    +
    +		this.translator = translator;
    +	}
    +
    +	/**
    +	 * Override the operator parallelism.
    +	 *
    +	 * @param parallelism operator parallelism
    +	 * @return this
    +	 */
    +	public TranslateEdgeValues<K,VV,OLD,NEW> setParallelism(int parallelism) {
    +		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN,
    +			"The parallelism must be greater than zero.");
    +
    +		this.parallelism = parallelism;
    +
    --- End diff --
    
    empty line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1900#discussion_r61551265
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToStringValue.java ---
    @@ -0,0 +1,39 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.asm.translate;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.types.LongValue;
    +import org.apache.flink.types.StringValue;
    +
    +/**
    + * Translate {@link LongValue} to {@link StringValue}.
    + */
    +public class LongValueToStringValue
    +implements MapFunction<LongValue, StringValue> {
    --- End diff --
    
    same :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1900#discussion_r61579289
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java ---
    @@ -0,0 +1,346 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.asm.translate;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
    +import org.apache.flink.api.java.operators.translation.WrappingFunction;
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo;
    +import org.apache.flink.api.java.typeutils.TypeExtractor;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.util.Preconditions;
    +
    +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
    +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN;
    +
    +/**
    + * Methods for translation of the type or modification of the data of graph
    + * labels, vertex values, and edge values.
    + */
    +public class Translate {
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Translate vertex labels
    +	// --------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Translate {@link Vertex} labels using the given {@link MapFunction}.
    +	 *
    +	 * @param vertices input vertices
    +	 * @param translator implements conversion from {@code OLD} to {@code NEW}
    +	 * @param <OLD> old vertex label type
    +	 * @param <NEW> new vertex label type
    +	 * @param <VV> vertex value type
    +	 * @return translated vertices
    +	 */
    +	public static <OLD,NEW,VV> DataSet<Vertex<NEW,VV>> translateVertexLabels(DataSet<Vertex<OLD,VV>> vertices, MapFunction<OLD,NEW> translator) {
    +		return translateVertexLabels(vertices, translator, PARALLELISM_UNKNOWN);
    +	}
    +
    +	/**
    +	 * Translate {@link Vertex} labels using the given {@link MapFunction}.
    +	 *
    +	 * @param vertices input vertices
    +	 * @param translator implements conversion from {@code OLD} to {@code NEW}
    +	 * @param parallelism operator parallelism
    +	 * @param <OLD> old vertex label type
    +	 * @param <NEW> new vertex label type
    +	 * @param <VV> vertex value type
    +	 * @return translated vertices
    +	 */
    +	@SuppressWarnings("unchecked")
    +	public static <OLD,NEW,VV> DataSet<Vertex<NEW,VV>> translateVertexLabels(DataSet<Vertex<OLD,VV>> vertices, MapFunction<OLD,NEW> translator, int parallelism) {
    +		Preconditions.checkNotNull(vertices);
    +		Preconditions.checkNotNull(translator);
    +		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN,
    +			"The parallelism must be greater than zero.");
    +
    +		Class<Vertex<NEW,VV>> vertexClass = (Class<Vertex<NEW,VV>>)(Class<? extends Vertex>) Vertex.class;
    +		TypeInformation<NEW> newType = TypeExtractor.createTypeInfo(MapFunction.class, translator.getClass(), 1, null, null);
    +		TypeInformation<VV> vertexValueType = ((TupleTypeInfo<Vertex<OLD,VV>>) vertices.getType()).getTypeAt(1);
    +
    +		TupleTypeInfo<Vertex<NEW,VV>> returnType = new TupleTypeInfo<>(vertexClass, newType, vertexValueType);
    +
    +		return vertices
    +			.map(new TranslateVertexLabel<OLD,NEW,VV>(translator))
    +			.returns(returnType)
    +				.setParallelism(parallelism)
    +				.name("Translate vertex labels");
    +	}
    +
    +	/**
    +	 * Translate {@link Vertex} labels using the given {@link MapFunction}.
    +	 *
    +	 * @param <OLD> old vertex label type
    +	 * @param <NEW> new vertex label type
    +	 * @param <VV> vertex value type
    +	 */
    +	@ForwardedFields("1")
    +	private static class TranslateVertexLabel<OLD,NEW,VV>
    +	extends WrappingFunction<MapFunction<OLD,NEW>>
    +	implements MapFunction<Vertex<OLD,VV>, Vertex<NEW,VV>> {
    +		private Vertex<NEW,VV> vertex = new Vertex<>();
    +
    +		public TranslateVertexLabel(MapFunction<OLD,NEW> translator) {
    --- End diff --
    
    We could. I don't see a strong reason to do this since the class visibility is already private.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on the pull request:

    https://github.com/apache/flink/pull/1900#issuecomment-215167381
  
    The switch to `MapFunction` means that a user can use a `RichMapFunction` with open, close, accumulators, etc.
    
    Rather than add six new methods to `Graph` I implemented these with three `GraphAlgorithm` which use the Gellyish `Graph.run(GraphAlgorithm)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on the pull request:

    https://github.com/apache/flink/pull/1900#issuecomment-215440722
  
    There's no reason we can't have both algorithms and methods on `Graph` which run the algorithm and ignore customization. I'll add this shortly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1900#discussion_r61551238
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToIntValue.java ---
    @@ -0,0 +1,47 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.asm.translate;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.types.IntValue;
    +import org.apache.flink.types.LongValue;
    +
    +/**
    + * Translate {@link LongValue} to {@link IntValue}.
    + *
    + * Throws {@link RuntimeException} for integer overflow.
    + */
    +public class LongValueToIntValue
    +implements MapFunction<LongValue, IntValue> {
    +
    +	private IntValue output = new IntValue();
    +
    +	@Override
    +	public IntValue map(LongValue value)
    +			throws Exception {
    --- End diff --
    
    can be moved to the line above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1900#issuecomment-214819709
  
    Implementations of `MapFunction`s can also be reused :)
    What I was thinking was to provide the translator simply as map functions, e.g. like `Tuple2ToVertexMap`. We can add them to `org.apache.flink.graph.utils` or create a subpackage for that. Then we can add the `translate*` methods to `Graph`. If someone wants to use the provided translator on a dataset of vertices or edges, they can simply do this with a mapper.
    
    My concern is that we should try to be consistent with the existing Gelly API. e.g. something like `graph.translateIds(new LongValueToStringValue())` is Gelly-like, while
    `Translate.translateGraphLabels(graph, new LongValueToStringValue())` is not. Also, I think that this feature is simple enough to be implemented as a collection of map functions instead of a separate utility.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1900#issuecomment-215012658
  
    How about we add the translate methods to the `Graph` class and implement translators as `MapFunction<Old, New>`? This way they can be used even if don't have a graph but a dataset of vertices or edges. There is no requirement to have the mappers operate on the vertex and edge level.
    Does this sound like a good idea?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1900#discussion_r61552498
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java ---
    @@ -0,0 +1,346 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.asm.translate;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
    +import org.apache.flink.api.java.operators.translation.WrappingFunction;
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo;
    +import org.apache.flink.api.java.typeutils.TypeExtractor;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.util.Preconditions;
    +
    +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
    +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN;
    +
    +/**
    + * Methods for translation of the type or modification of the data of graph
    + * labels, vertex values, and edge values.
    + */
    +public class Translate {
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Translate vertex labels
    +	// --------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Translate {@link Vertex} labels using the given {@link MapFunction}.
    +	 *
    +	 * @param vertices input vertices
    +	 * @param translator implements conversion from {@code OLD} to {@code NEW}
    +	 * @param <OLD> old vertex label type
    +	 * @param <NEW> new vertex label type
    +	 * @param <VV> vertex value type
    +	 * @return translated vertices
    +	 */
    +	public static <OLD,NEW,VV> DataSet<Vertex<NEW,VV>> translateVertexLabels(DataSet<Vertex<OLD,VV>> vertices, MapFunction<OLD,NEW> translator) {
    +		return translateVertexLabels(vertices, translator, PARALLELISM_UNKNOWN);
    +	}
    +
    +	/**
    +	 * Translate {@link Vertex} labels using the given {@link MapFunction}.
    +	 *
    +	 * @param vertices input vertices
    +	 * @param translator implements conversion from {@code OLD} to {@code NEW}
    +	 * @param parallelism operator parallelism
    +	 * @param <OLD> old vertex label type
    +	 * @param <NEW> new vertex label type
    +	 * @param <VV> vertex value type
    +	 * @return translated vertices
    +	 */
    +	@SuppressWarnings("unchecked")
    +	public static <OLD,NEW,VV> DataSet<Vertex<NEW,VV>> translateVertexLabels(DataSet<Vertex<OLD,VV>> vertices, MapFunction<OLD,NEW> translator, int parallelism) {
    +		Preconditions.checkNotNull(vertices);
    +		Preconditions.checkNotNull(translator);
    +		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN,
    +			"The parallelism must be greater than zero.");
    +
    +		Class<Vertex<NEW,VV>> vertexClass = (Class<Vertex<NEW,VV>>)(Class<? extends Vertex>) Vertex.class;
    +		TypeInformation<NEW> newType = TypeExtractor.createTypeInfo(MapFunction.class, translator.getClass(), 1, null, null);
    +		TypeInformation<VV> vertexValueType = ((TupleTypeInfo<Vertex<OLD,VV>>) vertices.getType()).getTypeAt(1);
    +
    +		TupleTypeInfo<Vertex<NEW,VV>> returnType = new TupleTypeInfo<>(vertexClass, newType, vertexValueType);
    +
    +		return vertices
    +			.map(new TranslateVertexLabel<OLD,NEW,VV>(translator))
    +			.returns(returnType)
    +				.setParallelism(parallelism)
    +				.name("Translate vertex labels");
    +	}
    +
    +	/**
    +	 * Translate {@link Vertex} labels using the given {@link MapFunction}.
    +	 *
    +	 * @param <OLD> old vertex label type
    +	 * @param <NEW> new vertex label type
    +	 * @param <VV> vertex value type
    +	 */
    +	@ForwardedFields("1")
    +	private static class TranslateVertexLabel<OLD,NEW,VV>
    +	extends WrappingFunction<MapFunction<OLD,NEW>>
    +	implements MapFunction<Vertex<OLD,VV>, Vertex<NEW,VV>> {
    +		private Vertex<NEW,VV> vertex = new Vertex<>();
    +
    +		public TranslateVertexLabel(MapFunction<OLD,NEW> translator) {
    +			super(translator);
    +		}
    +
    +		@Override
    +		public Vertex<NEW,VV> map(Vertex<OLD,VV> value)
    +				throws Exception {
    +			vertex.f0 = wrappedFunction.map(value.f0);
    +			vertex.f1 = value.f1;
    +
    +			return vertex;
    +		}
    +	}
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Translate edge labels
    +	// --------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Translate {@link Edge} labels using the given {@link MapFunction}.
    +	 *
    +	 * @param edges input edges
    +	 * @param translator implements conversion from {@code OLD} to {@code NEW}
    +	 * @param <OLD> old edge label type
    +	 * @param <NEW> new edge label type
    +	 * @param <EV> edge value type
    +	 * @return translated edges
    +	 */
    +	public static <OLD,NEW,EV> DataSet<Edge<NEW,EV>> translateEdgeLabels(DataSet<Edge<OLD,EV>> edges, MapFunction<OLD,NEW> translator) {
    +		return translateEdgeLabels(edges, translator, PARALLELISM_UNKNOWN);
    +	}
    +
    +	/**
    +	 * Translate {@link Edge} labels using the given {@link MapFunction}.
    +	 *
    +	 * @param edges input edges
    +	 * @param translator implements conversion from {@code OLD} to {@code NEW}
    +	 * @param parallelism operator parallelism
    +	 * @param <OLD> old edge label type
    +	 * @param <NEW> new edge label type
    +	 * @param <EV> edge value type
    +	 * @return translated edges
    +	 */
    +	@SuppressWarnings("unchecked")
    +	public static <OLD,NEW,EV> DataSet<Edge<NEW,EV>> translateEdgeLabels(DataSet<Edge<OLD,EV>> edges, MapFunction<OLD,NEW> translator, int parallelism) {
    +		Preconditions.checkNotNull(edges);
    +		Preconditions.checkNotNull(translator);
    +		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN,
    +			"The parallelism must be greater than zero.");
    +
    +		Class<Edge<NEW,EV>> edgeClass = (Class<Edge<NEW,EV>>)(Class<? extends Edge>) Edge.class;
    +		TypeInformation<NEW> newType = TypeExtractor.createTypeInfo(MapFunction.class, translator.getClass(), 1, null, null);
    +		TypeInformation<EV> edgeValueType = ((TupleTypeInfo<Edge<OLD,EV>>) edges.getType()).getTypeAt(2);
    +
    +		TupleTypeInfo<Edge<NEW,EV>> returnType = new TupleTypeInfo<>(edgeClass, newType, newType, edgeValueType);
    +
    +		return edges
    +			.map(new TranslateEdgeLabel<OLD,NEW,EV>(translator))
    +			.returns(returnType)
    +				.setParallelism(parallelism)
    --- End diff --
    
    indention is off


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1900#discussion_r61550681
  
    --- Diff: flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala ---
    @@ -407,6 +407,36 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
       }
     
       /**
    +   * Relabels vertices and edges using the given MapFunction.
    +   *
    +   * @param translator implements conversion from K to NEW
    +   * @return relabeled graph
    +   */
    +  def translateGraphLabels[NEW: TypeInformation : ClassTag](translator: MapFunction[K, NEW]): Graph[NEW,VV,EV] = {
    --- End diff --
    
    In the the Gelly code and docs, we refer to the vertex and edge keys as "Ids", not labels. I think we should rename the methods and javadocs to be consistent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on the pull request:

    https://github.com/apache/flink/pull/1900#issuecomment-215451225
  
    One can now choose between:
    `graph.translateEdgeValues(new LongValueToStringValue()))`
    `graph.run(new TranslateEdgeValues<>(new LongValueToStringValue()))`
    
    The latter is slightly longer but customizable with parallelism, etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on the pull request:

    https://github.com/apache/flink/pull/1900#issuecomment-214782976
  
    Hi @vasia,
    Comparing the APIs, `Translate` processes at the level of label or value whereas `mapVertices/Edges` process vertices and edges. What are use cases for the extra context (namely, the label) provided by `mapVertices/Edges`? Implementations of `Translator` can be reused across functions whereas the `MapFunction` provided to `mapVertices/Edges` are specific to that method. Would `mapLabels` require two new `MapFunction` or use an interface like `Translator`?
    
    `Translator` is wrapped by a specific `MapFunction` for each `Translate` function.
    
    Only one of the five current `Translate` functions operates on a `Graph`. `tranlateGraphLabels` could be moved to `Graph`, and there could be a `translateVertexValues` and `translateEdgeValues` also added to `Graph`. We don't always have a `Graph`, sometimes algorithms return a `DataSet` of vertices or edges, so having the static methods is helpful for these specialized `Vertex` and `Edge` operations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on the pull request:

    https://github.com/apache/flink/pull/1900#issuecomment-215738750
  
    I pushed the most recent changes. In order of things I care more about to things I care less about:
    
    The indentation is as intended. I find method chaining on a single line quite difficult to follow. I find that algorithms are much easier to read if separate operators are easily identifiable.
    
    Cramming `implements`, `extends`, and `throws` onto a single line is also difficult to parse, particularly for complicated nested parameters which may themselves extend interfaces.
    
    There are empty lines throughout the code. The remaining two look fine to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1900#discussion_r61552674
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java ---
    @@ -0,0 +1,81 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.asm.translate;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Graph;
    +import org.apache.flink.graph.GraphAlgorithm;
    +import org.apache.flink.util.Preconditions;
    +
    +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
    +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN;
    +import static org.apache.flink.graph.asm.translate.Translate.translateEdgeValues;
    +
    +/**
    + * Translate {@link Edge} values using the given {@link MapFunction}.
    + *
    + * @param <K> vertex label type
    + * @param <VV> vertex value type
    + * @param <OLD> old edge value type
    + * @param <NEW> new edge value type
    + */
    +public class TranslateEdgeValues<K, VV, OLD, NEW>
    +implements GraphAlgorithm<K, VV, OLD, Graph<K,VV,NEW>> {
    +
    +	// Required configuration
    +	private MapFunction<OLD,NEW> translator;
    +
    +	// Optional configuration
    +	private int parallelism = PARALLELISM_UNKNOWN;
    +
    +	/**
    +	 * Translate {@link Edge} values using the given {@link MapFunction}.
    +	 *
    +	 * @param translator implements conversion from {@code OLD} to {@code NEW}
    +	 */
    +	public TranslateEdgeValues(MapFunction<OLD,NEW> translator) {
    +		Preconditions.checkNotNull(translator);
    +
    --- End diff --
    
    empty line can be removed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on the pull request:

    https://github.com/apache/flink/pull/1900#issuecomment-215432236
  
    We can't translate vertex or edge IDs without modifying both as long as we want to return a `Graph`
    
    Setting parallelism is critical to scaling Flink to large datasets which doubles the function count.
    
    `graph.run(new TranslateEdgeValues<LongValue,LongValue,LongValue,StringValue>(new LongValueToStringValue()))` is verbose due to the parameterization and we should definitely try to find a way to improve that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1900#issuecomment-216042040
  
    Thanks for the updates @greghogan. Could you please also add the new `GraphAlgorithm`s in the docs? And maybe explain when to use those instead of the Graph methods? Otherwise, it's good to merge imo.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on the pull request:

    https://github.com/apache/flink/pull/1900#issuecomment-216277509
  
    Updated docs. Merging ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1900#discussion_r61551440
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java ---
    @@ -0,0 +1,346 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.asm.translate;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
    +import org.apache.flink.api.java.operators.translation.WrappingFunction;
    +import org.apache.flink.api.java.typeutils.TupleTypeInfo;
    +import org.apache.flink.api.java.typeutils.TypeExtractor;
    +import org.apache.flink.graph.Edge;
    +import org.apache.flink.graph.Vertex;
    +import org.apache.flink.util.Preconditions;
    +
    +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
    +import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN;
    +
    +/**
    + * Methods for translation of the type or modification of the data of graph
    + * labels, vertex values, and edge values.
    + */
    +public class Translate {
    +
    +	// --------------------------------------------------------------------------------------------
    +	//  Translate vertex labels
    +	// --------------------------------------------------------------------------------------------
    +
    +	/**
    +	 * Translate {@link Vertex} labels using the given {@link MapFunction}.
    +	 *
    +	 * @param vertices input vertices
    +	 * @param translator implements conversion from {@code OLD} to {@code NEW}
    +	 * @param <OLD> old vertex label type
    +	 * @param <NEW> new vertex label type
    +	 * @param <VV> vertex value type
    +	 * @return translated vertices
    +	 */
    +	public static <OLD,NEW,VV> DataSet<Vertex<NEW,VV>> translateVertexLabels(DataSet<Vertex<OLD,VV>> vertices, MapFunction<OLD,NEW> translator) {
    --- End diff --
    
    can you leave one space after commas? i.e. `<OLD, NEW, VV>` instead of  `<OLD,NEW,VV>`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1900#issuecomment-215663664
  
    My intention was to simplify the API, not have different ways to do the same thing. Anyway, if you think it's useful to have both the methods and the algorithms, I won't insist :)
    Can you please add a section in the Gelly guide describing how/when to use the translators? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on the pull request:

    https://github.com/apache/flink/pull/1900#issuecomment-215040606
  
    I think this is a very good idea, and we can wrap the user's MapFunction to function like a MapFunction.
    
    Now we can start discussing algorithms :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1900#discussion_r61551225
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueToIntValue.java ---
    @@ -0,0 +1,47 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.asm.translate;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.types.IntValue;
    +import org.apache.flink.types.LongValue;
    +
    +/**
    + * Translate {@link LongValue} to {@link IntValue}.
    + *
    + * Throws {@link RuntimeException} for integer overflow.
    + */
    +public class LongValueToIntValue
    +implements MapFunction<LongValue, IntValue> {
    --- End diff --
    
    can be moved to the line above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on the pull request:

    https://github.com/apache/flink/pull/1900#issuecomment-215428452
  
    Hi @greghogan,
    
    I feel we're over-engineering something that should be way simpler... Translators are just a set of `MapFunction`s applied on vertex or edge ids and values. I think that the current API is way too complex for what it does, e.g.
    `graph.run(new TranslateEdgeValues<LongValue,LongValue,LongValue,StringValue>(new LongValueToStringValue()))` seems too verbose to me for mapping an edge value.
    It could be as simple as `graph.mapEdges(new LongValueToStringValue())`.
    
    Also, I don't think we need 6 methods. We could either add 4 methods to `Graph`, i.e. `mapVertexIDs`, `mapVertexValues`, `mapEdgeIds`, `mapEdgeValues` or extend the `mapEdges` and `mapVertices` methods. In the latter case, we will have to provide specialized mappers as you note above, e.g. `MapVertexValueLongToString`, `MapEdgeValueLongToString`, etc., but these can internally wrap the same mapper.
    
    Please let me know your thought on this! Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on the pull request:

    https://github.com/apache/flink/pull/1900#issuecomment-216280065
  
    Implemented in 08e4bf9440a566c874c2b8e74ae2127ff264c672


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1900#discussion_r61583101
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueAddOffset.java ---
    @@ -0,0 +1,50 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.asm.translate;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.types.LongValue;
    +
    +/**
    + * Translate {@link LongValue} by adding a constant offset value.
    + */
    +public class LongValueAddOffset
    +implements MapFunction<LongValue, LongValue> {
    +
    +	private final long offset;
    +
    +	private LongValue output = new LongValue();
    +
    +	/**
    +	 * Translate {@link LongValue} by adding a constant offset value.
    +	 *
    +	 * @param offset value to be added to each element
    +	 */
    +	public LongValueAddOffset(long offset) {
    +		this.offset = offset;
    +	}
    +
    +	@Override
    +	public LongValue map(LongValue value)
    +			throws Exception {
    +		output.setValue(offset + value.getValue());
    +
    --- End diff --
    
    Fixed in next push.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3771] [gelly] Methods for translating G...

Posted by vasia <gi...@git.apache.org>.
Github user vasia commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1900#discussion_r61551172
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/LongValueAddOffset.java ---
    @@ -0,0 +1,50 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph.asm.translate;
    +
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.types.LongValue;
    +
    +/**
    + * Translate {@link LongValue} by adding a constant offset value.
    + */
    +public class LongValueAddOffset
    +implements MapFunction<LongValue, LongValue> {
    +
    +	private final long offset;
    +
    +	private LongValue output = new LongValue();
    +
    +	/**
    +	 * Translate {@link LongValue} by adding a constant offset value.
    +	 *
    +	 * @param offset value to be added to each element
    +	 */
    +	public LongValueAddOffset(long offset) {
    +		this.offset = offset;
    +	}
    +
    +	@Override
    +	public LongValue map(LongValue value)
    +			throws Exception {
    --- End diff --
    
    can be moved to the line above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---