You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/09/27 11:09:09 UTC
[04/20] flink git commit: [FLINK-2268] Remove Hadoop-related Akka
Serializers from runtime
[FLINK-2268] Remove Hadoop-related Akka Serializers from runtime
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28079326
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28079326
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28079326
Branch: refs/heads/master
Commit: 28079326436d3409e93ad8ec3d18d6fca77d3e53
Parents: 21e6d52
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Aug 22 16:40:28 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Sep 27 10:05:11 2017 +0200
----------------------------------------------------------------------
.../IOReadableWritableSerializer.scala | 62 --------------------
.../akka/serialization/WritableSerializer.scala | 61 -------------------
2 files changed, 123 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/28079326/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/serialization/IOReadableWritableSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/serialization/IOReadableWritableSerializer.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/serialization/IOReadableWritableSerializer.scala
deleted file mode 100644
index 54f26a9..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/serialization/IOReadableWritableSerializer.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.akka.serialization
-
-import akka.serialization.JSerializer
-import org.apache.flink.core.io.IOReadableWritable
-import org.apache.flink.runtime.util.{DataOutputSerializer, DataInputDeserializer}
-
-import org.apache.flink.util.InstantiationUtil
-
-class IOReadableWritableSerializer extends JSerializer {
- val INITIAL_BUFFER_SIZE = 256
-
- override protected def fromBinaryJava(bytes: Array[Byte], manifest: Class[_]): AnyRef = {
- val in = new DataInputDeserializer(bytes, 0, bytes.length)
-
- val instance = InstantiationUtil.instantiate(manifest)
-
- if(!instance.isInstanceOf[IOReadableWritable]){
- throw new RuntimeException(s"Class $manifest is not of type IOReadableWritable.")
- }
-
- val ioRW = instance.asInstanceOf[IOReadableWritable]
-
- ioRW.read(in)
-
- ioRW
- }
-
- override def includeManifest: Boolean = true
-
- override def toBinary(o: AnyRef): Array[Byte] = {
- if(!o.isInstanceOf[IOReadableWritable]){
- throw new RuntimeException("Object is not of type IOReadableWritable.")
- }
-
- val ioRW = o.asInstanceOf[IOReadableWritable]
-
- val out = new DataOutputSerializer(INITIAL_BUFFER_SIZE)
- ioRW.write(out)
-
- out.wrapAsByteBuffer().array()
- }
-
- override def identifier: Int = 1337
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/28079326/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/serialization/WritableSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/serialization/WritableSerializer.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/serialization/WritableSerializer.scala
deleted file mode 100644
index 500175b..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/serialization/WritableSerializer.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.akka.serialization
-
-import akka.serialization.JSerializer
-import org.apache.flink.runtime.util.{DataInputDeserializer, DataOutputSerializer}
-import org.apache.flink.util.InstantiationUtil
-import org.apache.hadoop.io.Writable
-
-class WritableSerializer extends JSerializer {
- val INITIAL_BUFFER_SIZE = 256
-
- override protected def fromBinaryJava(bytes: Array[Byte], manifest: Class[_]): AnyRef = {
- val in = new DataInputDeserializer(bytes, 0, bytes.length)
-
- val instance = InstantiationUtil.instantiate(manifest)
-
- if(!instance.isInstanceOf[Writable]){
- throw new RuntimeException(s"Class $manifest is not of type Writable.")
- }
-
- val writable = instance.asInstanceOf[Writable]
-
- writable.readFields(in)
-
- writable
- }
-
- override def includeManifest: Boolean = true
-
- override def toBinary(o: AnyRef): Array[Byte] = {
- if(!o.isInstanceOf[Writable]){
- throw new RuntimeException("Object is not of type Writable.")
- }
-
- val writable = o.asInstanceOf[Writable]
- val out = new DataOutputSerializer(INITIAL_BUFFER_SIZE)
-
- writable.write(out)
-
- out.wrapAsByteBuffer().array()
- }
-
- override def identifier: Int = 1337
-}