You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by gallenvara <gi...@git.apache.org> on 2016/04/06 10:10:06 UTC

[GitHub] flink pull request: [FLINK-3444] env.fromElements relies on the fi...

GitHub user gallenvara opened a pull request:

    https://github.com/apache/flink/pull/1857

    [FLINK-3444] env.fromElements relies on the first input element for determining the DataSet/DataStream type

    Add fromElements method with based class type to avoid the exception.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/gallenvara/flink flink-3444

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1857.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1857
    
----
commit 1eac212afa448984817f36f05ae931d4f0fa4ab9
Author: gallenvara <ga...@126.com>
Date:   2016-04-06T08:04:32Z

    Add fromElements method with based class type to avoid the exception.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3444] env.fromElements relies on the fi...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/flink/pull/1857#issuecomment-208313579
  
    Merging this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3444] env.fromElements relies on the fi...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the pull request:

    https://github.com/apache/flink/pull/1857#issuecomment-207442123
  
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3444] env.fromElements relies on the fi...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/1857#issuecomment-206250467
  
    @zentol Thanks a lot for review work. I will modify the codes base on your advice!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3444] env.fromElements relies on the fi...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/1857


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3444] env.fromElements relies on the fi...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the pull request:

    https://github.com/apache/flink/pull/1857#issuecomment-206248345
  
    Should this method also be added to the scala environment?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3444] env.fromElements relies on the fi...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1857#discussion_r58673287
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java ---
    @@ -780,6 +780,33 @@ public CsvReader readCsvFile(String filePath) {
     		return fromCollection(Arrays.asList(data), TypeExtractor.getForObject(data[0]), Utils.getCallLocationName());
     	}
     	
    +	/**
    +	 * Creates a new data set that contains the given elements. The elements must all be of the same type,
    +	 * for example, all of the {@link String} or {@link Integer}. The sequence of elements must not be empty.
    +	 * <p>
    +	 * The framework will try and determine the exact type from the collection elements.
    +	 * In case of generic elements, it may be necessary to manually supply the type information
    +	 * via {@link #fromCollection(Collection, TypeInformation)}.
    +	 * <p>
    +	 * Note that this operation will result in a non-parallel data source, i.e. a data source with
    +	 * a parallelism of one.
    +	 *
    +	 * @param type The base class type for every element in the collection.
    +	 * @param data The elements to make up the data set.
    +	 * @return A DataSet representing the given list of elements.
    +	 */
    +	@SafeVarargs
    +	public final <X> DataSource<X> fromElements(Class<X> type, X... data) {
    +		if (data == null) {
    +			throw new IllegalArgumentException("The data must not be null.");
    +		}
    +		if (data.length == 0) {
    +			throw new IllegalArgumentException("The number of elements must not be zero.");
    +		}
    +		
    +		return fromCollection(Arrays.asList(data), TypeExtractor.getForClass(type), Utils.getCallLocationName());
    --- End diff --
    
    why does the scala variant throw a specific exception on getForClass() failure but this one doesn't?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3444] env.fromElements relies on the fi...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1857#discussion_r58673374
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java ---
    @@ -780,6 +780,33 @@ public CsvReader readCsvFile(String filePath) {
     		return fromCollection(Arrays.asList(data), TypeExtractor.getForObject(data[0]), Utils.getCallLocationName());
     	}
     	
    +	/**
    +	 * Creates a new data set that contains the given elements. The elements must all be of the same type,
    +	 * for example, all of the {@link String} or {@link Integer}. The sequence of elements must not be empty.
    +	 * <p>
    +	 * The framework will try and determine the exact type from the collection elements.
    --- End diff --
    
    this line doesn't some valid for this method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3444] env.fromElements relies on the fi...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1857#discussion_r58673800
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/SourceFunctionTest.java ---
    @@ -44,6 +45,18 @@ public void fromElementsTest() throws Exception {
     	}
     
     	@Test
    +	public void fromElementsWithBaseTypeTest1() {
    --- End diff --
    
    these tests appear to be in the wrong file;I would put them into the StreamExecutionEnvironment class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3444] env.fromElements relies on the fi...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/1857#issuecomment-206656056
  
    @zentol codes modified and rebase the new commit with previous one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3444] env.fromElements relies on the fi...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/1857#issuecomment-208716571
  
    The error of CI build failure is not relevant with this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3444] env.fromElements relies on the fi...

Posted by gallenvara <gi...@git.apache.org>.
Github user gallenvara commented on the pull request:

    https://github.com/apache/flink/pull/1857#issuecomment-206355988
  
    @zentol , PR updated. The scala environment determine the type with `implicitly[TypeInformation[T]]` which is always the class `Object`. In the case this issue mentioned, no exception is thrown. IMO, we should not add this method to it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3444] env.fromElements relies on the fi...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1857#discussion_r58701096
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java ---
    @@ -777,7 +777,50 @@ public CsvReader readCsvFile(String filePath) {
     			throw new IllegalArgumentException("The number of elements must not be zero.");
     		}
     		
    -		return fromCollection(Arrays.asList(data), TypeExtractor.getForObject(data[0]), Utils.getCallLocationName());
    +		TypeInformation<X> typeInfo;
    +		try {
    +			typeInfo = TypeExtractor.getForObject(data[0]);
    +		}
    +		catch (Exception e) {
    +			throw new RuntimeException("Could not create TypeInformation for type " + data[0].getClass().getName()
    +					+ "; please specify the TypeInformation manually via "
    +					+ "StreamExecutionEnvironment#fromCollection(Collection, TypeInformation)");
    +		}
    +
    +		return fromCollection(Arrays.asList(data), typeInfo, Utils.getCallLocationName());
    +	}
    +	
    +	/**
    +	 * Creates a new data set that contains the given elements. The framework will determine the type according to the 
    +	 * based type user supplied. The elements should be the same or be the subclass to the based type. 
    +	 * The sequence of elements must not be empty.
    +	 * Note that this operation will result in a non-parallel data source, i.e. a data source with
    +	 * a parallelism of one.
    +	 *
    +	 * @param type The base class type for every element in the collection.
    +	 * @param data The elements to make up the data set.
    +	 * @return A DataSet representing the given list of elements.
    +	 */
    +	@SafeVarargs
    +	public final <X> DataSource<X> fromElements(Class<X> type, X... data) {
    +		if (data == null) {
    +			throw new IllegalArgumentException("The data must not be null.");
    +		}
    +		if (data.length == 0) {
    +			throw new IllegalArgumentException("The number of elements must not be zero.");
    +		}
    +		
    +		TypeInformation<X> typeInfo;
    +		try {
    +			typeInfo = TypeExtractor.getForClass(type);
    +		}
    +		catch (Exception e) {
    +			throw new RuntimeException("Could not create TypeInformation for type " + type.getName()
    +					+ "; please specify the TypeInformation manually via "
    +					+ "StreamExecutionEnvironment#fromCollection(Collection, TypeInformation)");
    --- End diff --
    
    This should be ExecutionEnvironment#fromCollection


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3444] env.fromElements relies on the fi...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1857#discussion_r58701074
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java ---
    @@ -777,7 +777,50 @@ public CsvReader readCsvFile(String filePath) {
     			throw new IllegalArgumentException("The number of elements must not be zero.");
     		}
     		
    -		return fromCollection(Arrays.asList(data), TypeExtractor.getForObject(data[0]), Utils.getCallLocationName());
    +		TypeInformation<X> typeInfo;
    +		try {
    +			typeInfo = TypeExtractor.getForObject(data[0]);
    +		}
    +		catch (Exception e) {
    +			throw new RuntimeException("Could not create TypeInformation for type " + data[0].getClass().getName()
    +					+ "; please specify the TypeInformation manually via "
    +					+ "StreamExecutionEnvironment#fromCollection(Collection, TypeInformation)");
    --- End diff --
    
    This should be ExecutionEnvironment#fromCollection


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request: [FLINK-3444] env.fromElements relies on the fi...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1857#discussion_r58673148
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---
    @@ -673,6 +673,43 @@ public TimeCharacteristic getStreamTimeCharacteristic() {
     	}
     
     	/**
    +	 * Creates a new data stream that contains the given elements. The elements must all be of the same type, for
    +	 * example, all of the {@link String} or {@link Integer}.
    +	 * <p>
    +	 * The framework will try and determine the exact type from the elements. In case of generic elements, it may be
    +	 * necessary to manually supply the type information via {@link #fromCollection(java.util.Collection,
    +	 * org.apache.flink.api.common.typeinfo.TypeInformation)}.
    +	 * <p>
    +	 * Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
    +	 * degree of parallelism one.
    +	 *
    +	 * @param clazz
    +	 * 		The base class type in the collection.
    +	 * @param data
    +	 * 		The array of elements to create the data stream from.
    +	 * @param <OUT>
    +	 * 		The type of the returned data stream
    +	 * @return The data stream representing the given array of elements
    +	 */
    +	@SafeVarargs
    +	public final <OUT> DataStreamSource<OUT> fromElements(Class<OUT> clazz, OUT... data) {
    +		if (data.length == 0) {
    +			throw new IllegalArgumentException("fromElements needs at least one element as argument");
    +		}
    +
    +		TypeInformation<OUT> typeInfo;
    +		try {
    +			typeInfo = TypeExtractor.getForClass(clazz);
    +		}
    +		catch (Exception e) {
    +			throw new RuntimeException("Could not create TypeInformation for type " + data[0].getClass().getName()
    --- End diff --
    
    shouldn't this exception contain clazz.getName() instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---