You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by aljoscha <gi...@git.apache.org> on 2014/11/26 13:28:14 UTC

[GitHub] incubator-flink pull request: Add support for Subclasses, Interfac...

GitHub user aljoscha opened a pull request:

    https://github.com/apache/incubator-flink/pull/236

    Add support for Subclasses, Interfaces, Abstract Classes as POJOs

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/aljoscha/incubator-flink subclass-serializer

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-flink/pull/236.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #236
    
----
commit f9ce67c9d7a8a507fd06ac041d66c9d29fe40a0f
Author: Aljoscha Krettek <al...@gmail.com>
Date:   2014-11-26T12:27:06Z

    Add support for Subclasses, Interfaces, Abstract Classes as POJOs

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Add support for Subclasses, Interfac...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/236#discussion_r20935866
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java ---
    @@ -181,6 +266,15 @@ public void serialize(T value, DataOutputView target) throws IOException {
     		} else {
     			target.writeBoolean(false);
     		}
    +
    +		if (clazz == value.getClass()) {
    +			target.writeBoolean(true);
    --- End diff --
    
    True


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Add support for Subclasses, Interfac...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/incubator-flink/pull/236#issuecomment-64604464
  
    Good work!
    I think we need a few end-to-end tests for these changes before we can merge them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Add support for Subclasses, Interfac...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/236#discussion_r20935860
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java ---
    @@ -116,25 +168,36 @@ public boolean isStateful() {
     	
     	@Override
     	public T createInstance() {
    +		if (clazz.isInterface() || Modifier.isAbstract(clazz.getModifiers())) {
    +			return null;
    +		}
     		try {
     			T t = clazz.newInstance();
    -		
    -			for (int i = 0; i < numFields; i++) {
    -				fields[i].set(t, fieldSerializers[i].createInstance());
    -			}
    -			
    +			initializeFields(t);
     			return t;
     		}
     		catch (Exception e) {
    -			throw new RuntimeException("Cannot instantiate class.", e);
    +//			throw new RuntimeException("Cannot instantiate class.", e);
    +			return null;
    --- End diff --
    
    Changing it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Add support for Subclasses, Interfac...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/incubator-flink/pull/236#issuecomment-65960764
  
    I think this is a fair start. Can we make the tag more compact (single byte integer)? Otherwise, I think it is a good idea to get this in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Add support for Subclasses, Interfac...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/incubator-flink/pull/236#issuecomment-64663589
  
    @rmetzger I hope I addressed your concerns. :dancers: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Add support for Subclasses, Interfac...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/236#discussion_r20934362
  
    --- Diff: flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java ---
    @@ -1061,6 +1062,7 @@ public void testFunctionInputInOutputMultipleTimes2() {
     	public abstract class AbstractClass {}
     
     	@Test
    +	@Ignore
    --- End diff --
    
    I thought that's what the PR is introducing?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Add support for Subclasses, Interfac...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/incubator-flink/pull/236#issuecomment-65063153
  
    The code looks good.
    I tried it out and found that this is not working: https://github.com/rmetzger/incubator-flink/commit/d391ee184b47541fd83709e858eb10757111d14a


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Add support for Subclasses, Interfac...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/236#discussion_r20936183
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java ---
    @@ -100,8 +110,50 @@ private void readObject(ObjectInputStream in)
     						+ " (" + fieldName + ")");
     			}
     		}
    +
    +		subclassSerializerCache = new HashMap<Class<?>, TypeSerializer>();
    +	}
    +
    +	private TypeSerializer getSubclassSerializer(Class<?> subclass) {
    +		TypeSerializer<?> result = subclassSerializerCache.get(subclass);
    --- End diff --
    
    Oops .. I was apparently confused. Since your key is a class, everything is fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Add support for Subclasses, Interfac...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/236#discussion_r20934292
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java ---
    @@ -195,22 +289,44 @@ public void serialize(T value, DataOutputView target) throws IOException {
     			throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields" +
     					"before.");
     		}
    +
    +		// Serialize subclass fields as well.
    +		if (!(clazz == value.getClass())) {
    +			Class<?> subclass = value.getClass();
    +			TypeSerializer subclassSerializer = getSubclassSerializer(subclass);
    +			subclassSerializer.serialize(value, target);
    +		}
     	}
     
     	@Override
    +	@SuppressWarnings("unchecked")
     	public T deserialize(DataInputView source) throws IOException {
     		boolean isNull = source.readBoolean();
     		if(isNull) {
     			return null;
     		}
    +
     		T target;
    -		try {
    -			target = clazz.newInstance();
    -		}
    -		catch (Throwable t) {
    -			throw new RuntimeException("Cannot instantiate class.", t);
    +
    +		Class<?> subclass = null;
    +		TypeSerializer subclassSerializer = null;
    +
    +		boolean isExactClass = source.readBoolean();
    +		if (!isExactClass) {
    +			String subclassName = source.readUTF();
    +			try {
    +				subclass = Class.forName(subclassName, true, this.getClass().getClassLoader());
    +			} catch (ClassNotFoundException e) {
    +				throw new RuntimeException("Cannot instantiate class.", e);
    +			}
    +			subclassSerializer = getSubclassSerializer(subclass);
    +			target = (T) subclassSerializer.createInstance();
    +			// also initialize fields for which the subclass serializer is not responsible
    +			initializeFields(target);
    +		} else {
    +			target = createInstance();
    --- End diff --
    
    you are swallowing instantiation exceptions here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Add support for Subclasses, Interfac...

Posted by StephanEwen <gi...@git.apache.org>.
Github user StephanEwen commented on the pull request:

    https://github.com/apache/incubator-flink/pull/236#issuecomment-64811119
  
    It is a start, though, gets us further than the state before...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Add support for Subclasses, Interfac...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on the pull request:

    https://github.com/apache/incubator-flink/pull/236#issuecomment-64603372
  
    This change does not add support for `Writable` interfaces, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Add support for Subclasses, Interfac...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/incubator-flink/pull/236#issuecomment-64809571
  
    Any thoughts about this?
    
    I myself am not 100% happy with this. When the user uses a subclass that cannot be analysed by our TypeExtractor the whole program will fail with an Exception. At Runtime, not at pre-flight time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Add support for Subclasses, Interfac...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/236#discussion_r20933753
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java ---
    @@ -116,25 +168,36 @@ public boolean isStateful() {
     	
     	@Override
     	public T createInstance() {
    +		if (clazz.isInterface() || Modifier.isAbstract(clazz.getModifiers())) {
    +			return null;
    +		}
     		try {
     			T t = clazz.newInstance();
    -		
    -			for (int i = 0; i < numFields; i++) {
    -				fields[i].set(t, fieldSerializers[i].createInstance());
    -			}
    -			
    +			initializeFields(t);
     			return t;
     		}
     		catch (Exception e) {
    -			throw new RuntimeException("Cannot instantiate class.", e);
    +//			throw new RuntimeException("Cannot instantiate class.", e);
    +			return null;
    --- End diff --
    
    Why are you returning `null` on error?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Add support for Subclasses, Interfac...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/incubator-flink/pull/236#issuecomment-64811314
  
    Thats true. :D Before the job would just produce incorrect results because the PojoSerializer would only serialise the fields it knows about.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Add support for Subclasses, Interfac...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/236#discussion_r20934116
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java ---
    @@ -100,8 +110,50 @@ private void readObject(ObjectInputStream in)
     						+ " (" + fieldName + ")");
     			}
     		}
    +
    +		subclassSerializerCache = new HashMap<Class<?>, TypeSerializer>();
    +	}
    +
    +	private TypeSerializer getSubclassSerializer(Class<?> subclass) {
    +		TypeSerializer<?> result = subclassSerializerCache.get(subclass);
    --- End diff --
    
    Are we sure that all the TypeSerializers are implementing hash() and equals() properly?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Add support for Subclasses, Interfac...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/236#discussion_r20935941
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java ---
    @@ -100,8 +110,50 @@ private void readObject(ObjectInputStream in)
     						+ " (" + fieldName + ")");
     			}
     		}
    +
    +		subclassSerializerCache = new HashMap<Class<?>, TypeSerializer>();
    +	}
    +
    +	private TypeSerializer getSubclassSerializer(Class<?> subclass) {
    +		TypeSerializer<?> result = subclassSerializerCache.get(subclass);
    --- End diff --
    
    Not applicable here ... :pensive: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Add support for Subclasses, Interfac...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/incubator-flink/pull/236#issuecomment-66791457
  
    I now also added a way for users to register classes and actual support for class tags. The performance numbers I posted earlier are still valid: 
    
     - WordCount Tuple: 440 sec
     - WordCount POJO (w/o this PR): 540 sec
     - WordCount POJO (w/ this PR): 550 sec
     - WordCount POJO Subclass: 2443 sec
     - WordClass POJO Subclass with Tagging: 830 sec



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Re: [GitHub] incubator-flink pull request: Add support for Subclasses, Interfac...

Posted by Ufuk Celebi <uc...@apache.org>.
Thanks for the update. :)

Re: [GitHub] incubator-flink pull request: Add support for Subclasses, Interfac...

Posted by Ufuk Celebi <uc...@apache.org>.
Thanks for the update. :)

[GitHub] incubator-flink pull request: Add support for Subclasses, Interfac...

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the pull request:

    https://github.com/apache/incubator-flink/pull/236#issuecomment-65228151
  
    I ran some tests to measure the performance impact. I tried our regular WordCount example, which uses Tuples, a POJO WordCount and a POJO WordCount where the user function emits a subclass of the type of the DataSet.
    
    The results:
    
     - WordCount Tuple: 440 sec
     - WordCount POJO (w/o this PR): 540 sec
     - WordCount POJO (w/ this PR): 550 sec
     - WordCount POJO Subclass: 2443 sec
     - WordClass POJO Subclass with Tagging: 830 sec
    
    The "tagging" version is a mockup of how it would behave if we allowed users to register classes, so that only an ID has to be transferred. Similar to how Kryo does it.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-flink pull request: Add support for Subclasses, Interfac...

Posted by rmetzger <gi...@git.apache.org>.
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/236#discussion_r20933933
  
    --- Diff: flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java ---
    @@ -181,6 +266,15 @@ public void serialize(T value, DataOutputView target) throws IOException {
     		} else {
     			target.writeBoolean(false);
     		}
    +
    +		if (clazz == value.getClass()) {
    +			target.writeBoolean(true);
    --- End diff --
    
    So now we are writing 2 bytes (null value, subclass) for these flags. Can't we use one byte and use the bits as flags?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---