You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/09/27 11:09:09 UTC

[04/20] flink git commit: [FLINK-2268] Remove Hadoop-related Akka Serializers from runtime

[FLINK-2268] Remove Hadoop-related Akka Serializers from runtime


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28079326
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28079326
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28079326

Branch: refs/heads/master
Commit: 28079326436d3409e93ad8ec3d18d6fca77d3e53
Parents: 21e6d52
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Aug 22 16:40:28 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Sep 27 10:05:11 2017 +0200

----------------------------------------------------------------------
 .../IOReadableWritableSerializer.scala          | 62 --------------------
 .../akka/serialization/WritableSerializer.scala | 61 -------------------
 2 files changed, 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/28079326/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/serialization/IOReadableWritableSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/serialization/IOReadableWritableSerializer.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/serialization/IOReadableWritableSerializer.scala
deleted file mode 100644
index 54f26a9..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/serialization/IOReadableWritableSerializer.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.akka.serialization
-
-import akka.serialization.JSerializer
-import org.apache.flink.core.io.IOReadableWritable
-import org.apache.flink.runtime.util.{DataOutputSerializer, DataInputDeserializer}
-
-import org.apache.flink.util.InstantiationUtil
-
-class IOReadableWritableSerializer extends JSerializer {
-  val INITIAL_BUFFER_SIZE = 256
-
-  override protected def fromBinaryJava(bytes: Array[Byte], manifest: Class[_]): AnyRef = {
-    val in = new DataInputDeserializer(bytes, 0, bytes.length)
-
-    val instance = InstantiationUtil.instantiate(manifest)
-
-    if(!instance.isInstanceOf[IOReadableWritable]){
-      throw new RuntimeException(s"Class $manifest is not of type IOReadableWritable.")
-    }
-
-    val ioRW = instance.asInstanceOf[IOReadableWritable]
-
-    ioRW.read(in)
-
-    ioRW
-  }
-
-  override def includeManifest: Boolean = true
-
-  override def toBinary(o: AnyRef): Array[Byte] = {
-    if(!o.isInstanceOf[IOReadableWritable]){
-      throw new RuntimeException("Object is not of type IOReadableWritable.")
-    }
-
-    val ioRW = o.asInstanceOf[IOReadableWritable]
-
-    val out = new DataOutputSerializer(INITIAL_BUFFER_SIZE)
-    ioRW.write(out)
-
-    out.wrapAsByteBuffer().array()
-  }
-
-  override def identifier: Int = 1337
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/28079326/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/serialization/WritableSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/serialization/WritableSerializer.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/serialization/WritableSerializer.scala
deleted file mode 100644
index 500175b..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/serialization/WritableSerializer.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.akka.serialization
-
-import akka.serialization.JSerializer
-import org.apache.flink.runtime.util.{DataInputDeserializer, DataOutputSerializer}
-import org.apache.flink.util.InstantiationUtil
-import org.apache.hadoop.io.Writable
-
-class WritableSerializer extends JSerializer {
-  val INITIAL_BUFFER_SIZE = 256
-
-  override protected def fromBinaryJava(bytes: Array[Byte], manifest: Class[_]): AnyRef = {
-    val in = new DataInputDeserializer(bytes, 0, bytes.length)
-
-    val instance = InstantiationUtil.instantiate(manifest)
-
-    if(!instance.isInstanceOf[Writable]){
-      throw new RuntimeException(s"Class $manifest is not of type Writable.")
-    }
-
-    val writable = instance.asInstanceOf[Writable]
-
-    writable.readFields(in)
-
-    writable
-  }
-
-  override def includeManifest: Boolean = true
-
-  override def toBinary(o: AnyRef): Array[Byte] = {
-    if(!o.isInstanceOf[Writable]){
-      throw new RuntimeException("Object is not of type Writable.")
-    }
-
-    val writable = o.asInstanceOf[Writable]
-    val out = new DataOutputSerializer(INITIAL_BUFFER_SIZE)
-
-    writable.write(out)
-
-    out.wrapAsByteBuffer().array()
-  }
-
-  override def identifier: Int = 1337
-}