You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/05/21 20:39:38 UTC
spark git commit: [SPARK-7787] [STREAMING] Fix serialization issue of
SerializableAWSCredentials
Repository: spark
Updated Branches:
refs/heads/master 8730fbb47 -> 4b7ff3092
[SPARK-7787] [STREAMING] Fix serialization issue of SerializableAWSCredentials
Lack of default constructor causes deserialization to fail. This occurs only when the AWS credentials are explicitly specified through KinesisUtils.
Author: Tathagata Das <ta...@gmail.com>
Closes #6316 from tdas/SPARK-7787 and squashes the following commits:
248ca5c [Tathagata Das] Fixed serializability
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4b7ff309
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4b7ff309
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4b7ff309
Branch: refs/heads/master
Commit: 4b7ff3092c53827817079e0810563cbb0b9d0747
Parents: 8730fbb
Author: Tathagata Das <ta...@gmail.com>
Authored: Thu May 21 11:39:32 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu May 21 11:39:32 2015 -0700
----------------------------------------------------------------------
.../streaming/kinesis/KinesisReceiver.scala | 5 +++-
.../kinesis/KinesisReceiverSuite.scala | 30 +++++++++-----------
2 files changed, 17 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/4b7ff309/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
index 9016449..800202e 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
@@ -31,7 +31,10 @@ import org.apache.spark.util.Utils
private[kinesis]
case class SerializableAWSCredentials(accessKeyId: String, secretKey: String)
- extends BasicAWSCredentials(accessKeyId, secretKey) with Serializable
+ extends AWSCredentials {
+ override def getAWSAccessKeyId: String = accessKeyId
+ override def getAWSSecretKey: String = secretKey
+}
/**
* Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver.
http://git-wip-us.apache.org/repos/asf/spark/blob/4b7ff309/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
----------------------------------------------------------------------
diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
index 7c17ee9..cd19c33 100644
--- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
+++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala
@@ -20,27 +20,18 @@ import java.nio.ByteBuffer
import scala.collection.JavaConversions.seqAsJavaList
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.Milliseconds
-import org.apache.spark.streaming.Seconds
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.TestSuiteBase
-import org.apache.spark.util.{ManualClock, Clock}
-
-import org.mockito.Mockito._
-import org.scalatest.BeforeAndAfter
-import org.scalatest.Matchers
-import org.scalatest.mock.MockitoSugar
-
-import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException
-import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException
-import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException
-import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException
+import com.amazonaws.services.kinesis.clientlibrary.exceptions.{InvalidStateException, KinesisClientLibDependencyException, ShutdownException, ThrottlingException}
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
import com.amazonaws.services.kinesis.model.Record
-import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
+import org.mockito.Mockito._
+import org.scalatest.{BeforeAndAfter, Matchers}
+import org.scalatest.mock.MockitoSugar
+
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext, TestSuiteBase}
+import org.apache.spark.util.{Clock, ManualClock, Utils}
/**
* Suite of Kinesis streaming receiver tests focusing mostly on the KinesisRecordProcessor
@@ -99,6 +90,11 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
ssc.stop()
}
+ test("check serializability of SerializableAWSCredentials") {
+ Utils.deserialize[SerializableAWSCredentials](
+ Utils.serialize(new SerializableAWSCredentials("x", "y")))
+ }
+
test("process records including store and checkpoint") {
when(receiverMock.isStopped()).thenReturn(false)
when(checkpointStateMock.shouldCheckpoint()).thenReturn(true)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org