You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2015/04/29 04:57:48 UTC
[1/4] samza git commit: SAMZA-465: Use coordinator stream and
eliminate CheckpointManager
Repository: samza
Updated Branches:
refs/heads/master c37d75270 -> 23fb2e1c0
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-test/src/main/config/perf/container-performance.properties
----------------------------------------------------------------------
diff --git a/samza-test/src/main/config/perf/container-performance.properties b/samza-test/src/main/config/perf/container-performance.properties
index 86b2d58..7dcab02 100644
--- a/samza-test/src/main/config/perf/container-performance.properties
+++ b/samza-test/src/main/config/perf/container-performance.properties
@@ -28,5 +28,15 @@ task.opts=-Xmx3072m -XX:+UseConcMarkSweepGC
task.class=org.apache.samza.test.performance.TestPerformanceTask
task.inputs=mock.stream0,mock.stream1,mock.stream2,mock.stream3,mock.stream4,mock.stream5,mock.stream6,mock.stream7,mock.stream8,mock.stream9,mock.stream10,mock.stream11,mock.stream12,mock.stream13,mock.stream14,mock.stream15,mock.stream16,mock.stream17,mock.stream18,mock.stream19,mock.stream20,mock.stream21,mock.stream22,mock.stream23,mock.stream24,mock.stream25,mock.stream26,mock.stream27,mock.stream28,mock.stream29,mock.stream30,mock.stream31,mock.stream32,mock.stream33,mock.stream34,mock.stream35,mock.stream36,mock.stream37,mock.stream38,mock.stream39,mock.stream40,mock.stream41,mock.stream42,mock.stream43,mock.stream44,mock.stream45,mock.stream46,mock.stream47,mock.stream48,mock.stream49,mock.stream50,mock.stream51,mock.stream52,mock.stream53,mock.stream54,mock.stream55,mock.stream56,mock.stream57,mock.stream58,mock.stream59,mock.stream60,mock.stream61,mock.stream62,mock.stream63,mock.stream64,mock.stream65,mock.stream66,mock.stream67,mock.stream68,mock.stream69,mock.stream70,m
ock.stream71,mock.stream72,mock.stream73,mock.stream74,mock.stream75,mock.stream76,mock.stream77,mock.stream78,mock.stream79,mock.stream80,mock.stream81,mock.stream82,mock.stream83,mock.stream84,mock.stream85,mock.stream86,mock.stream87,mock.stream88,mock.stream89,mock.stream90,mock.stream91,mock.stream92,mock.stream93,mock.stream94,mock.stream95,mock.stream96,mock.stream97,mock.stream98,mock.stream99,mock.stream100,mock.stream101,mock.stream102,mock.stream103,mock.stream104,mock.stream105,mock.stream106,mock.stream107,mock.stream108,mock.stream109,mock.stream110,mock.stream111,mock.stream112,mock.stream113,mock.stream114,mock.stream115,mock.stream116,mock.stream117,mock.stream118,mock.stream119,mock.stream120,mock.stream121,mock.stream122,mock.stream123,mock.stream124,mock.stream125,mock.stream126,mock.stream127,mock.stream128,mock.stream129,mock.stream130,mock.stream131,mock.stream132,mock.stream133,mock.stream134,mock.stream135,mock.stream136,mock.stream137,mock.stream138,mock.st
ream139,mock.stream140,mock.stream141,mock.stream142,mock.stream143,mock.stream144,mock.stream145,mock.stream146,mock.stream147,mock.stream148,mock.stream149,mock.stream150,mock.stream151,mock.stream152,mock.stream153,mock.stream154,mock.stream155,mock.stream156,mock.stream157,mock.stream158,mock.stream159,mock.stream160,mock.stream161,mock.stream162,mock.stream163,mock.stream164,mock.stream165,mock.stream166,mock.stream167,mock.stream168,mock.stream169,mock.stream170,mock.stream171,mock.stream172,mock.stream173,mock.stream174,mock.stream175,mock.stream176,mock.stream177,mock.stream178,mock.stream179,mock.stream180,mock.stream181,mock.stream182,mock.stream183,mock.stream184,mock.stream185,mock.stream186,mock.stream187,mock.stream188,mock.stream189,mock.stream190,mock.stream191,mock.stream192,mock.stream193,mock.stream194,mock.stream195,mock.stream196,mock.stream197,mock.stream198,mock.stream199,mock.stream200,mock.stream201,mock.stream202,mock.stream203,mock.stream204,mock.stream205
,mock.stream206,mock.stream207,mock.stream208,mock.stream209,mock.stream210,mock.stream211,mock.stream212,mock.stream213,mock.stream214,mock.stream215,mock.stream216,mock.stream217,mock.stream218,mock.stream219,mock.stream220,mock.stream221,mock.stream222,mock.stream223,mock.stream224,mock.stream225,mock.stream226,mock.stream227,mock.stream228,mock.stream229,mock.stream230,mock.stream231,mock.stream232,mock.stream233,mock.stream234,mock.stream235,mock.stream236,mock.stream237,mock.stream238,mock.stream239,mock.stream240,mock.stream241,mock.stream242,mock.stream243,mock.stream244,mock.stream245,mock.stream246,mock.stream247,mock.stream248,mock.stream249,mock.stream250,mock.stream251,mock.stream252,mock.stream253,mock.stream254,mock.stream255,mock.stream256,mock.stream257,mock.stream258,mock.stream259,mock.stream260,mock.stream261,mock.stream262,mock.stream263,mock.stream264,mock.stream265,mock.stream266,mock.stream267,mock.stream268,mock.stream269,mock.stream270,mock.stream271,mock.s
tream272,mock.stream273,mock.stream274,mock.stream275,mock.stream276,mock.stream277,mock.stream278,mock.stream279,mock.stream280,mock.stream281,mock.stream282,mock.stream283,mock.stream284,mock.stream285,mock.stream286,mock.stream287,mock.stream288,mock.stream289,mock.stream290,mock.stream291,mock.stream292,mock.stream293,mock.stream294,mock.stream295,mock.stream296,mock.stream297,mock.stream298,mock.stream299,mock.stream300,mock.stream301,mock.stream302,mock.stream303,mock.stream304,mock.stream305,mock.stream306,mock.stream307,mock.stream308,mock.stream309,mock.stream310,mock.stream311,mock.stream312,mock.stream313,mock.stream314,mock.stream315,mock.stream316,mock.stream317,mock.stream318,mock.stream319,mock.stream320,mock.stream321,mock.stream322,mock.stream323,mock.stream324,mock.stream325,mock.stream326,mock.stream327,mock.stream328,mock.stream329,mock.stream330,mock.stream331,mock.stream332,mock.stream333,mock.stream334,mock.stream335,mock.stream336,mock.stream337,mock.stream33
8,mock.stream339,mock.stream340,mock.stream341,mock.stream342,mock.stream343,mock.stream344,mock.stream345,mock.stream346,mock.stream347,mock.stream348,mock.stream349,mock.stream350,mock.stream351,mock.stream352,mock.stream353,mock.stream354,mock.stream355,mock.stream356,mock.stream357,mock.stream358,mock.stream359,mock.stream360,mock.stream361,mock.stream362,mock.stream363,mock.stream364,mock.stream365,mock.stream366,mock.stream367,mock.stream368,mock.stream369,mock.stream370,mock.stream371,mock.stream372,mock.stream373,mock.stream374,mock.stream375,mock.stream376,mock.stream377,mock.stream378,mock.stream379,mock.stream380,mock.stream381,mock.stream382,mock.stream383,mock.stream384,mock.stream385,mock.stream386,mock.stream387,mock.stream388,mock.stream389,mock.stream390,mock.stream391,mock.stream392,mock.stream393,mock.stream394,mock.stream395,mock.stream396,mock.stream397,mock.stream398,mock.stream399,mock.stream400,mock.stream401,mock.stream402,mock.stream403,mock.stream404,mock.
stream405,mock.stream406,mock.stream407,mock.stream408,mock.stream409,mock.stream410,mock.stream411,mock.stream412,mock.stream413,mock.stream414,mock.stream415,mock.stream416,mock.stream417,mock.stream418,mock.stream419,mock.stream420,mock.stream421,mock.stream422,mock.stream423,mock.stream424,mock.stream425,mock.stream426,mock.stream427,mock.stream428,mock.stream429,mock.stream430,mock.stream431,mock.stream432,mock.stream433,mock.stream434,mock.stream435,mock.stream436,mock.stream437,mock.stream438,mock.stream439,mock.stream440,mock.stream441,mock.stream442,mock.stream443,mock.stream444,mock.stream445,mock.stream446,mock.stream447,mock.stream448,mock.stream449,mock.stream450,mock.stream451,mock.stream452,mock.stream453,mock.stream454,mock.stream455,mock.stream456,mock.stream457,mock.stream458,mock.stream459,mock.stream460,mock.stream461,mock.stream462,mock.stream463,mock.stream464,mock.stream465,mock.stream466,mock.stream467,mock.stream468,mock.stream469,mock.stream470,mock.stream4
71,mock.stream472,mock.stream473,mock.stream474,mock.stream475,mock.stream476,mock.stream477,mock.stream478,mock.stream479,mock.stream480,mock.stream481,mock.stream482,mock.stream483,mock.stream484,mock.stream485,mock.stream486,mock.stream487,mock.stream488,mock.stream489,mock.stream490,mock.stream491,mock.stream492,mock.stream493,mock.stream494,mock.stream495,mock.stream496,mock.stream497,mock.stream498,mock.stream499,mock.stream500,mock.stream501,mock.stream502,mock.stream503,mock.stream504,mock.stream505,mock.stream506,mock.stream507,mock.stream508,mock.stream509,mock.stream510,mock.stream511,mock.stream512,mock.stream513,mock.stream514,mock.stream515,mock.stream516,mock.stream517,mock.stream518,mock.stream519,mock.stream520,mock.stream521,mock.stream522,mock.stream523,mock.stream524,mock.stream525,mock.stream526,mock.stream527,mock.stream528,mock.stream529,mock.stream530,mock.stream531,mock.stream532,mock.stream533,mock.stream534,mock.stream535,mock.stream536,mock.stream537,mock
.stream538,mock.stream539,mock.stream540,mock.stream541,mock.stream542,mock.stream543,mock.stream544,mock.stream545,mock.stream546,mock.stream547,mock.stream548,mock.stream549,mock.stream550,mock.stream551,mock.stream552,mock.stream553,mock.stream554,mock.stream555,mock.stream556,mock.stream557,mock.stream558,mock.stream559,mock.stream560,mock.stream561,mock.stream562,mock.stream563,mock.stream564,mock.stream565,mock.stream566,mock.stream567,mock.stream568,mock.stream569,mock.stream570,mock.stream571,mock.stream572,mock.stream573,mock.stream574,mock.stream575,mock.stream576,mock.stream577,mock.stream578,mock.stream579,mock.stream580,mock.stream581,mock.stream582,mock.stream583,mock.stream584,mock.stream585,mock.stream586,mock.stream587,mock.stream588,mock.stream589,mock.stream590,mock.stream591,mock.stream592,mock.stream593,mock.stream594,mock.stream595,mock.stream596,mock.stream597,mock.stream598,mock.stream599,mock.stream600,mock.stream601,mock.stream602,mock.stream603,mock.stream
604,mock.stream605,mock.stream606,mock.stream607,mock.stream608,mock.stream609,mock.stream610,mock.stream611,mock.stream612,mock.stream613,mock.stream614,mock.stream615,mock.stream616,mock.stream617,mock.stream618,mock.stream619,mock.stream620,mock.stream621,mock.stream622,mock.stream623,mock.stream624,mock.stream625,mock.stream626,mock.stream627,mock.stream628,mock.stream629,mock.stream630,mock.stream631,mock.stream632,mock.stream633,mock.stream634,mock.stream635,mock.stream636,mock.stream637,mock.stream638,mock.stream639,mock.stream640,mock.stream641,mock.stream642,mock.stream643,mock.stream644,mock.stream645,mock.stream646,mock.stream647,mock.stream648,mock.stream649,mock.stream650,mock.stream651,mock.stream652,mock.stream653,mock.stream654,mock.stream655,mock.stream656,mock.stream657,mock.stream658,mock.stream659,mock.stream660,mock.stream661,mock.stream662,mock.stream663,mock.stream664,mock.stream665,mock.stream666,mock.stream667,mock.stream668,mock.stream669,mock.stream670,moc
k.stream671,mock.stream672,mock.stream673,mock.stream674,mock.stream675,mock.stream676,mock.stream677,mock.stream678,mock.stream679,mock.stream680,mock.stream681,mock.stream682,mock.stream683,mock.stream684,mock.stream685,mock.stream686,mock.stream687,mock.stream688,mock.stream689,mock.stream690,mock.stream691,mock.stream692,mock.stream693,mock.stream694,mock.stream695,mock.stream696,mock.stream697,mock.stream698,mock.stream699,mock.stream700,mock.stream701,mock.stream702,mock.stream703,mock.stream704,mock.stream705,mock.stream706,mock.stream707,mock.stream708,mock.stream709,mock.stream710,mock.stream711,mock.stream712,mock.stream713,mock.stream714,mock.stream715,mock.stream716,mock.stream717,mock.stream718,mock.stream719,mock.stream720,mock.stream721,mock.stream722,mock.stream723,mock.stream724,mock.stream725,mock.stream726,mock.stream727,mock.stream728,mock.stream729,mock.stream730,mock.stream731,mock.stream732,mock.stream733,mock.stream734,mock.stream735,mock.stream736,mock.strea
m737,mock.stream738,mock.stream739,mock.stream740,mock.stream741,mock.stream742,mock.stream743,mock.stream744,mock.stream745,mock.stream746,mock.stream747,mock.stream748,mock.stream749,mock.stream750,mock.stream751,mock.stream752,mock.stream753,mock.stream754,mock.stream755,mock.stream756,mock.stream757,mock.stream758,mock.stream759,mock.stream760,mock.stream761,mock.stream762,mock.stream763,mock.stream764,mock.stream765,mock.stream766,mock.stream767,mock.stream768,mock.stream769,mock.stream770,mock.stream771,mock.stream772,mock.stream773,mock.stream774,mock.stream775,mock.stream776,mock.stream777,mock.stream778,mock.stream779,mock.stream780,mock.stream781,mock.stream782,mock.stream783,mock.stream784,mock.stream785,mock.stream786,mock.stream787,mock.stream788,mock.stream789,mock.stream790,mock.stream791,mock.stream792,mock.stream793,mock.stream794,mock.stream795,mock.stream796,mock.stream797,mock.stream798,mock.stream799,mock.stream800,mock.stream801,mock.stream802,mock.stream803,mo
ck.stream804,mock.stream805,mock.stream806,mock.stream807,mock.stream808,mock.stream809,mock.stream810,mock.stream811,mock.stream812,mock.stream813,mock.stream814,mock.stream815,mock.stream816,mock.stream817,mock.stream818,mock.stream819,mock.stream820,mock.stream821,mock.stream822,mock.stream823,mock.stream824,mock.stream825,mock.stream826,mock.stream827,mock.stream828,mock.stream829,mock.stream830,mock.stream831,mock.stream832,mock.stream833,mock.stream834,mock.stream835,mock.stream836,mock.stream837,mock.stream838,mock.stream839,mock.stream840,mock.stream841,mock.stream842,mock.stream843,mock.stream844,mock.stream845,mock.stream846,mock.stream847,mock.stream848,mock.stream849,mock.stream850,mock.stream851,mock.stream852,mock.stream853,mock.stream854,mock.stream855,mock.stream856,mock.stream857,mock.stream858,mock.stream859,mock.stream860,mock.stream861,mock.stream862,mock.stream863,mock.stream864,mock.stream865,mock.stream866,mock.stream867,mock.stream868,mock.stream869,mock.stre
am870,mock.stream871,mock.stream872,mock.stream873,mock.stream874,mock.stream875,mock.stream876,mock.stream877,mock.stream878,mock.stream879,mock.stream880,mock.stream881,mock.stream882,mock.stream883,mock.stream884,mock.stream885,mock.stream886,mock.stream887,mock.stream888,mock.stream889,mock.stream890,mock.stream891,mock.stream892,mock.stream893,mock.stream894,mock.stream895,mock.stream896,mock.stream897,mock.stream898,mock.stream899,mock.stream900,mock.stream901,mock.stream902,mock.stream903,mock.stream904,mock.stream905,mock.stream906,mock.stream907,mock.stream908,mock.stream909,mock.stream910,mock.stream911,mock.stream912,mock.stream913,mock.stream914,mock.stream915,mock.stream916,mock.stream917,mock.stream918,mock.stream919,mock.stream920,mock.stream921,mock.stream922,mock.stream923,mock.stream924,mock.stream925,mock.stream926,mock.stream927,mock.stream928,mock.stream929,mock.stream930,mock.stream931,mock.stream932,mock.stream933,mock.stream934,mock.stream935,mock.stream936,m
ock.stream937,mock.stream938,mock.stream939,mock.stream940,mock.stream941,mock.stream942,mock.stream943,mock.stream944,mock.stream945,mock.stream946,mock.stream947,mock.stream948,mock.stream949,mock.stream950,mock.stream951,mock.stream952,mock.stream953,mock.stream954,mock.stream955,mock.stream956,mock.stream957,mock.stream958,mock.stream959,mock.stream960,mock.stream961,mock.stream962,mock.stream963,mock.stream964,mock.stream965,mock.stream966,mock.stream967,mock.stream968,mock.stream969,mock.stream970,mock.stream971,mock.stream972,mock.stream973,mock.stream974,mock.stream975,mock.stream976,mock.stream977,mock.stream978,mock.stream979,mock.stream980,mock.stream981,mock.stream982,mock.stream983,mock.stream984,mock.stream985,mock.stream986,mock.stream987,mock.stream988,mock.stream989,mock.stream990,mock.stream991,mock.stream992,mock.stream993,mock.stream994,mock.stream995,mock.stream996,mock.stream997,mock.stream998,mock.stream999,
-# Kafka System
+# Mock system which produces random messages
systems.mock.samza.factory=org.apache.samza.system.mock.MockSystemFactory
+
+# Kafka System (only used for coordinator stream in this test)
+systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
+systems.kafka.consumer.zookeeper.connect=localhost:2181/
+systems.kafka.producer.bootstrap.servers=localhost:9092
+
+#define coordinator system
+job.coordinator.system=kafka
+job.coordinator.replication.factor=1
+
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-test/src/main/config/perf/kafka-read-write-performance.properties
----------------------------------------------------------------------
diff --git a/samza-test/src/main/config/perf/kafka-read-write-performance.properties b/samza-test/src/main/config/perf/kafka-read-write-performance.properties
index 122b14a..112caba 100644
--- a/samza-test/src/main/config/perf/kafka-read-write-performance.properties
+++ b/samza-test/src/main/config/perf/kafka-read-write-performance.properties
@@ -33,3 +33,6 @@ systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.offset.default=oldest
systems.kafka.consumer.zookeeper.connect=localhost:2181/
systems.kafka.producer.bootstrap.servers=localhost:9092
+
+#Coordinator replication factor
+job.coordinator.replication.factor=1
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
index c0a20af..8c95211 100644
--- a/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
+++ b/samza-test/src/main/java/org/apache/samza/system/mock/MockSystemAdmin.java
@@ -22,6 +22,7 @@ package org.apache.samza.system.mock;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.system.SystemAdmin;
@@ -54,12 +55,7 @@ public class MockSystemAdmin implements SystemAdmin {
return metadata;
}
- @Override
- public void createChangelogStream(String streamName, int numOfPartitions) {
- throw new SamzaException("Method not implemented");
- }
-
- @Override
+ @Override
public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> offsets) {
Map<SystemStreamPartition, String> offsetsAfter = new HashMap<SystemStreamPartition, String>();
@@ -69,4 +65,14 @@ public class MockSystemAdmin implements SystemAdmin {
return offsetsAfter;
}
+
+ @Override
+ public void createChangelogStream(String streamName, int numOfPartitions) {
+ throw new UnsupportedOperationException("Method not implemented");
+ }
+
+ @Override
+ public void createCoordinatorStream(String streamName) {
+ throw new UnsupportedOperationException("Method not implemented.");
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-test/src/main/python/samza_failure_testing.py
----------------------------------------------------------------------
diff --git a/samza-test/src/main/python/samza_failure_testing.py b/samza-test/src/main/python/samza_failure_testing.py
index 198db26..28a56b1 100755
--- a/samza-test/src/main/python/samza_failure_testing.py
+++ b/samza-test/src/main/python/samza_failure_testing.py
@@ -1,3 +1,5 @@
+#!/usr/bin/env python
+
#################################################################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala b/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
index 45c76e8..d1f1d84 100644
--- a/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
+++ b/samza-test/src/main/scala/org/apache/samza/test/performance/TestPerformanceTask.scala
@@ -27,10 +27,10 @@ import org.apache.samza.task.StreamTask
import org.apache.samza.task.TaskCoordinator
import org.apache.samza.task.TaskCoordinator.RequestScope
import org.apache.samza.config.Config
-import org.apache.samza.util.Logging
+import org.apache.samza.util.{Util, Logging}
import org.apache.samza.system.SystemStream
import org.apache.samza.system.OutgoingMessageEnvelope
-import org.apache.samza.util.Util
+
object TestPerformanceTask {
// No thread safety is needed for these variables because they're mutated in
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
----------------------------------------------------------------------
diff --git a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
index d66b3bd..8200696 100644
--- a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
+++ b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
@@ -22,6 +22,8 @@ package org.apache.samza.test.integration
import java.util.Properties
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
+import java.util
+
import kafka.admin.AdminUtils
import kafka.consumer.Consumer
import kafka.consumer.ConsumerConfig
@@ -54,14 +56,15 @@ import org.apache.samza.task.TaskCoordinator
import org.apache.samza.task.TaskCoordinator.RequestScope
import org.apache.samza.util.ClientUtilTopicMetadataStore
import org.apache.samza.util.TopicMetadataStore
+import org.apache.samza.job.JobRunner
import org.junit.Assert._
import org.junit.{BeforeClass, AfterClass, Test}
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.mutable.SynchronizedMap
+import org.apache.samza.job.JobRunner
import org.apache.kafka.clients.producer.{ProducerConfig, Producer, ProducerRecord, KafkaProducer}
-import java.util
import org.apache.samza.util.KafkaUtil
object TestStatefulTask {
@@ -193,10 +196,9 @@ object TestStatefulTask {
*/
class TestStatefulTask {
import TestStatefulTask._
- val jobFactory = new ThreadJobFactory
val jobConfig = Map(
- "job.factory.class" -> jobFactory.getClass.getCanonicalName,
+ "job.factory.class" -> classOf[ThreadJobFactory].getCanonicalName,
"job.name" -> "hello-stateful-world",
"task.class" -> "org.apache.samza.test.integration.TestTask",
"task.inputs" -> "kafka.input",
@@ -206,7 +208,6 @@ class TestStatefulTask {
"stores.mystore.msg.serde" -> "string",
"stores.mystore.changelog" -> "kafka.mystoreChangelog",
"stores.mystore.changelog.replication.factor" -> "1",
-
"systems.kafka.samza.factory" -> "org.apache.samza.system.kafka.KafkaSystemFactory",
// Always start consuming at offset 0. This avoids a race condition between
// the producer and the consumer in this test (SAMZA-166, SAMZA-224).
@@ -319,21 +320,15 @@ class TestStatefulTask {
* time, number of partitions, etc.
*/
def startJob = {
- val job = jobFactory.getJob(new MapConfig(jobConfig))
-
// Start task.
- job.submit
+ val job = new JobRunner(new MapConfig(jobConfig)).run
assertEquals(ApplicationStatus.Running, job.waitForStatus(ApplicationStatus.Running, 60000))
TestTask.awaitTaskRegistered
val tasks = TestTask.tasks
-
assertEquals("Should only have a single partition in this task", 1, tasks.size)
-
val task = tasks.values.toList.head
-
task.initFinished.await(60, TimeUnit.SECONDS)
assertEquals(0, task.initFinished.getCount)
-
(job, task)
}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala b/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
index 5da1c35..1f51551 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
@@ -26,7 +26,6 @@ object YarnConfig {
val CONTAINER_MAX_CPU_CORES = "yarn.container.cpu.cores"
val CONTAINER_RETRY_COUNT = "yarn.container.retry.count"
val CONTAINER_RETRY_WINDOW_MS = "yarn.container.retry.window.ms"
- val CONTAINER_COUNT = "yarn.container.count"
val AM_JVM_OPTIONS = "yarn.am.opts"
val AM_JMX_ENABLED = "yarn.am.jmx.enabled"
val AM_CONTAINER_MAX_MEMORY_MB = "yarn.am.container.memory.mb"
@@ -47,8 +46,6 @@ class YarnConfig(config: Config) extends ScalaMapConfig(config) {
def getPackagePath = getOption(YarnConfig.PACKAGE_PATH)
- def getContainerCount: Option[Int] = getOption(YarnConfig.CONTAINER_COUNT).map(_.toInt)
-
def getAmOpts = getOption(YarnConfig.AM_JVM_OPTIONS)
def getAMContainerMaxMemoryMb: Option[Int] = getOption(YarnConfig.AM_CONTAINER_MAX_MEMORY_MB).map(_.toInt)
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
index a1dbe04..20aa373 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
@@ -39,6 +39,7 @@ import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.util.hadoop.HttpFileSystem
import org.apache.samza.util.Logging
import org.apache.samza.serializers.model.SamzaObjectMapper
+import org.apache.samza.coordinator.JobCoordinator
import org.apache.samza.SamzaException
/**
@@ -70,8 +71,12 @@ object SamzaAppMaster extends Logging with AMRMClientAsync.CallbackHandler {
info("got node manager port: %s" format nodePortString)
val nodeHttpPortString = System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.toString)
info("got node manager http port: %s" format nodeHttpPortString)
- val config = new MapConfig(SamzaObjectMapper.getObjectMapper.readValue(System.getenv(ShellCommandConfig.ENV_CONFIG), classOf[Config]))
- info("got config: %s" format config)
+ val coordinatorSystemConfig = new MapConfig(SamzaObjectMapper.getObjectMapper.readValue(System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG), classOf[Config]))
+ info("got coordinator system config: %s" format coordinatorSystemConfig)
+ val registry = new MetricsRegistryMap
+ val jobCoordinator = JobCoordinator(coordinatorSystemConfig, registry)
+ val config = jobCoordinator.jobModel.getConfig
+ info("got config: %s" format coordinatorSystemConfig)
putMDC("jobName", config.getName.getOrElse(throw new SamzaException("can not find the job name")))
putMDC("jobId", config.getJobId.getOrElse("1"))
val hConfig = new YarnConfiguration
@@ -79,14 +84,13 @@ object SamzaAppMaster extends Logging with AMRMClientAsync.CallbackHandler {
val interval = config.getAMPollIntervalMs.getOrElse(DEFAULT_POLL_INTERVAL_MS)
val amClient = AMRMClientAsync.createAMRMClientAsync[ContainerRequest](interval, this)
val clientHelper = new ClientHelper(hConfig)
- val registry = new MetricsRegistryMap
val containerMem = config.getContainerMaxMemoryMb.getOrElse(DEFAULT_CONTAINER_MEM)
val containerCpu = config.getContainerMaxCpuCores.getOrElse(DEFAULT_CPU_CORES)
val jmxServer = if (new YarnConfig(config).getJmxServerEnabled) Some(new JmxServer()) else None
try {
// wire up all of the yarn event listeners
- val state = new SamzaAppMasterState(-1, containerId, nodeHostString, nodePortString.toInt, nodeHttpPortString.toInt)
+ val state = new SamzaAppMasterState(jobCoordinator, -1, containerId, nodeHostString, nodePortString.toInt, nodeHttpPortString.toInt)
val service = new SamzaAppMasterService(config, state, registry, clientHelper)
val lifecycle = new SamzaAppMasterLifecycle(containerMem, containerCpu, state, amClient)
val metrics = new SamzaAppMasterMetrics(config, state, registry)
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
index b1e5546..1445605 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
@@ -33,7 +33,7 @@ import org.apache.samza.coordinator.JobCoordinator
* responses from YARN's resource manager (via SamzaAppMasterTaskManager). This
* class holds the current state of the application master.
*/
-class SamzaAppMasterState(val taskId: Int, val containerId: ContainerId, val nodeHost: String, val nodePort: Int, val nodeHttpPort: Int) extends YarnAppMasterListener with Logging {
+class SamzaAppMasterState(val jobCoordinator: JobCoordinator, val taskId: Int, val containerId: ContainerId, val nodeHost: String, val nodePort: Int, val nodeHttpPort: Int) extends YarnAppMasterListener with Logging {
// controlled by the AM
var completedContainers = 0
var neededContainers = 0
@@ -43,7 +43,6 @@ class SamzaAppMasterState(val taskId: Int, val containerId: ContainerId, val nod
var runningContainers = Map[Int, YarnContainer]()
var unclaimedContainers = Set[Int]()
var finishedContainers = Set[Int]()
- var jobCoordinator: JobCoordinator = null
var status = FinalApplicationStatus.UNDEFINED
var jobHealthy = true
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
index 38e1f5f..1743c86 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
@@ -37,13 +37,13 @@ import org.apache.hadoop.yarn.util.ConverterUtils
import org.apache.hadoop.yarn.util.Records
import org.apache.samza.config.Config
import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.config.YarnConfig
import org.apache.samza.config.YarnConfig.Config2Yarn
import org.apache.samza.job.CommandBuilder
import org.apache.samza.job.ShellCommandBuilder
-import org.apache.samza.util.Util
import org.apache.samza.util.Logging
import org.apache.samza.coordinator.JobCoordinator
+import org.apache.samza.util.{Util, Logging}
+import org.apache.samza.config.JobConfig.Config2Job
object SamzaAppMasterTaskManager {
val DEFAULT_CONTAINER_MEM = 1024
@@ -63,13 +63,7 @@ case class ContainerFailure(val count: Int, val lastFailure: Long)
class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaAppMasterState, amClient: AMRMClientAsync[ContainerRequest], conf: YarnConfiguration) extends YarnAppMasterListener with Logging {
import SamzaAppMasterTaskManager._
- state.containerCount = config
- .getContainerCount
- .getOrElse({
- info("No %s specified. Defaulting to one container." format YarnConfig.CONTAINER_COUNT)
- 1
- })
- state.jobCoordinator = JobCoordinator(config, state.containerCount)
+ state.containerCount = config.getContainerCount
var containerFailures = Map[Int, ContainerFailure]()
var tooManyFailedContainers = false
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
index 24b11da..8dd70c9 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
@@ -24,6 +24,7 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api.records.ApplicationId
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.samza.config.Config
+import org.apache.samza.config.JobConfig
import org.apache.samza.util.Util
import org.apache.samza.job.ApplicationStatus
import org.apache.samza.job.ApplicationStatus.Running
@@ -36,6 +37,11 @@ import org.apache.samza.config.YarnConfig
import org.apache.samza.config.ShellCommandConfig
import org.apache.samza.SamzaException
import org.apache.samza.serializers.model.SamzaObjectMapper
+import org.apache.samza.config.JobConfig.Config2Job
+import scala.collection.JavaConversions._
+import org.apache.samza.config.MapConfig
+import org.apache.samza.config.ConfigException
+import org.apache.samza.config.SystemConfig
object YarnJob {
val DEFAULT_AM_CONTAINER_MEM = 1024
@@ -59,8 +65,9 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob {
"export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec ./__package/bin/run-am.sh 1>logs/%s 2>logs/%s"
format (ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.STDOUT, ApplicationConstants.STDERR)),
Some({
+ val coordinatorSystemConfig = Util.buildCoordinatorStreamConfig(config)
val envMap = Map(
- ShellCommandConfig.ENV_CONFIG -> Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString(config)),
+ ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG -> Util.envVarEscape(SamzaObjectMapper.getObjectMapper.writeValueAsString(coordinatorSystemConfig)),
ShellCommandConfig.ENV_JAVA_OPTS -> Util.envVarEscape(config.getAmOpts.getOrElse("")))
val envMapWithJavaHome = config.getAMJavaHome match {
case Some(javaHome) => envMap + (ShellCommandConfig.ENV_JAVA_HOME -> javaHome)
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
index 765f72f..df5992e 100644
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala
@@ -34,8 +34,10 @@ import org.junit.Test
import scala.annotation.elidable
import scala.annotation.elidable.ASSERTION
import java.net.URL
+import org.apache.samza.coordinator.JobCoordinator
class TestSamzaAppMasterLifecycle {
+ val coordinator = new JobCoordinator(null, null, null)
val amClient = new AMRMClientAsyncImpl[ContainerRequest](1, Mockito.mock(classOf[CallbackHandler])) {
var host = ""
var port = 0
@@ -79,7 +81,7 @@ class TestSamzaAppMasterLifecycle {
@Test
def testLifecycleShouldRegisterOnInit {
- val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "test", 1, 2)
+ val state = new SamzaAppMasterState(coordinator, -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "test", 1, 2)
state.rpcUrl = new URL("http://localhost:1")
state.trackingUrl = new URL("http://localhost:2")
val saml = new SamzaAppMasterLifecycle(512, 2, state, amClient)
@@ -91,7 +93,7 @@ class TestSamzaAppMasterLifecycle {
@Test
def testLifecycleShouldUnregisterOnShutdown {
- val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
+ val state = new SamzaAppMasterState(coordinator, -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
state.status = FinalApplicationStatus.SUCCEEDED
new SamzaAppMasterLifecycle(128, 1, state, amClient).onShutdown
assertEquals(FinalApplicationStatus.SUCCEEDED, amClient.status)
@@ -111,7 +113,7 @@ class TestSamzaAppMasterLifecycle {
@Test
def testLifecycleShouldShutdownOnInvalidContainerSettings {
- val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "test", 1, 2)
+ val state = new SamzaAppMasterState(coordinator, -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "test", 1, 2)
state.rpcUrl = new URL("http://localhost:1")
state.trackingUrl = new URL("http://localhost:2")
List(new SamzaAppMasterLifecycle(768, 1, state, amClient),
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
index 81dea9d..6f4bfaf 100644
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala
@@ -24,6 +24,7 @@ import java.net.URL
import java.io.InputStreamReader
import org.apache.hadoop.yarn.util.ConverterUtils
import org.apache.samza.config.MapConfig
+import org.apache.samza.metrics.MetricsRegistryMap
import org.junit.Assert._
import org.junit.Test
import scala.collection.JavaConversions._
@@ -32,17 +33,16 @@ import org.apache.samza.container.TaskName
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.Partition
import org.apache.samza.coordinator.JobCoordinator
+import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
class TestSamzaAppMasterService {
@Test
def testAppMasterDashboardShouldStart {
val config = getDummyConfig
- val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "", 1, 2)
+ val state = new SamzaAppMasterState(JobCoordinator(config), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "", 1, 2)
val service = new SamzaAppMasterService(config, state, null, null)
val taskName = new TaskName("test")
- state.jobCoordinator = JobCoordinator(config, 1)
-
// start the dashboard
service.onInit
assertTrue(state.rpcUrl.getPort > 0)
@@ -70,11 +70,9 @@ class TestSamzaAppMasterService {
def testAppMasterDashboardWebServiceShouldStart {
// Create some dummy config
val config = getDummyConfig
- val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "", 1, 2)
+ val state = new SamzaAppMasterState(JobCoordinator(config), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000002"), "", 1, 2)
val service = new SamzaAppMasterService(config, state, null, null)
- state.jobCoordinator = JobCoordinator(config, 1)
-
// start the dashboard
service.onInit
assertTrue(state.rpcUrl.getPort > 0)
@@ -94,6 +92,7 @@ class TestSamzaAppMasterService {
}
private def getDummyConfig: Config = new MapConfig(Map[String, String](
+ "job.name" -> "test",
"yarn.container.count" -> "1",
"systems.test-system.samza.factory" -> "org.apache.samza.job.yarn.MockSystemFactory",
"yarn.container.memory.mb" -> "512",
@@ -102,5 +101,7 @@ class TestSamzaAppMasterService {
"systems.test-system.samza.key.serde" -> "org.apache.samza.serializers.JsonSerde",
"systems.test-system.samza.msg.serde" -> "org.apache.samza.serializers.JsonSerde",
"yarn.container.retry.count" -> "1",
- "yarn.container.retry.window.ms" -> "1999999999"))
+ "yarn.container.retry.window.ms" -> "1999999999",
+ "job.coordinator.system" -> "coordinator",
+ "systems.coordinator.samza.factory" -> classOf[MockCoordinatorStreamSystemFactory].getCanonicalName))
}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
index ee2d0ea..1e936b4 100644
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterTaskManager.scala
@@ -31,12 +31,12 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.ConverterUtils
import org.apache.samza.Partition
import org.apache.samza.config.Config
+import org.apache.samza.config.JobConfig.Config2Job
import org.apache.samza.config.YarnConfig.Config2Yarn
import org.apache.samza.config.MapConfig
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.system.SystemFactory
import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.util.Util
import org.junit.Test
import scala.collection.JavaConversions._
import TestSamzaAppMasterTaskManager._
@@ -44,6 +44,11 @@ import java.net.URL
import org.apache.samza.system.SystemAdmin
import org.apache.samza.system.SystemStreamMetadata
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
+import org.apache.samza.coordinator.JobCoordinator
+import org.apache.samza.job.model.JobModel
+import org.apache.samza.job.model.ContainerModel
+import org.apache.samza.container.TaskName
+import org.apache.samza.job.model.TaskModel
object TestSamzaAppMasterTaskManager {
def getContainer(containerId: ContainerId) = new Container {
@@ -151,16 +156,26 @@ class TestSamzaAppMasterTaskManager {
"yarn.container.retry.count" -> "1",
"yarn.container.retry.window.ms" -> "1999999999"))
+ def getCoordinator(containerCount: Int = 1) = {
+ val containers = new java.util.HashMap[java.lang.Integer, ContainerModel]()
+ (0 until containerCount).foreach(idx => {
+ val container = new ContainerModel(idx, Map[TaskName, TaskModel]())
+ containers.put(new java.lang.Integer(idx), container)
+ })
+ val jobModel = new JobModel(config, containers)
+ new JobCoordinator(jobModel, null, null)
+ }
+
@Test
- def testAppMasterShouldDefaultToOneContainerIfContainerCountIsNotSpecified {
- val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
+ def testAppMasterShouldDefaultToOneContainerIfTaskCountIsNotSpecified {
+ val state = new SamzaAppMasterState(getCoordinator(), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
val taskManager = new SamzaAppMasterTaskManager(clock, config, state, null, new YarnConfiguration)
assertEquals(1, state.containerCount)
}
@Test
def testAppMasterShouldStopWhenContainersFinish {
- val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
+ val state = new SamzaAppMasterState(getCoordinator(), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
val taskManager = new SamzaAppMasterTaskManager(clock, config, state, null, new YarnConfiguration)
assertFalse(taskManager.shouldShutdown)
@@ -175,7 +190,7 @@ class TestSamzaAppMasterTaskManager {
@Test
def testAppMasterShouldRequestANewContainerWhenATaskFails {
val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(false, List(), List())))
- val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
+ val state = new SamzaAppMasterState(getCoordinator(), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
state.coordinatorUrl = new URL("http://localhost:1234")
val taskManager = new SamzaAppMasterTaskManager(clock, config, state, amClient, new YarnConfiguration) {
override def startContainer(packagePath: Path, container: Container, env: Map[String, String], cmds: String*) {
@@ -210,7 +225,7 @@ class TestSamzaAppMasterTaskManager {
@Test
def testAppMasterShouldRequestANewContainerWhenATaskIsReleased {
val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(false, List(), List())))
- val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
+ val state = new SamzaAppMasterState(getCoordinator(), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
state.coordinatorUrl = new URL("http://localhost:1234")
state.containerCount = 2
var containersRequested = 0
@@ -286,7 +301,7 @@ class TestSamzaAppMasterTaskManager {
map.put("yarn.container.count", "2")
val newConfig = new MapConfig(map)
val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(false, List(), List())))
- val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
+ val state = new SamzaAppMasterState(getCoordinator(2), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
state.containerCount = 2
state.coordinatorUrl = new URL("http://localhost:1234")
var containersStarted = 0
@@ -348,7 +363,7 @@ class TestSamzaAppMasterTaskManager {
@Test
def testAppMasterShouldReleaseExtraContainers {
val amClient = getAmClient(new TestAMRMClientImpl(getAppMasterResponse(false, List(), List())))
- val state = new SamzaAppMasterState(-1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
+ val state = new SamzaAppMasterState(getCoordinator(), -1, ConverterUtils.toContainerId("container_1350670447861_0003_01_000001"), "", 1, 2)
state.coordinatorUrl = new URL("http://localhost:1234")
var containersRequested = 0
var containersStarted = 0
@@ -403,8 +418,7 @@ class MockSystemFactory extends SystemFactory {
}
def getAdmin(systemName: String, config: Config) = {
- val containerCount = config.getContainerCount.getOrElse(1)
- new MockSystemAdmin(containerCount)
+ new MockSystemAdmin(config.getContainerCount)
}
}
@@ -422,5 +436,11 @@ class MockSystemAdmin(numTasks: Int) extends SystemAdmin {
}).toMap[String, SystemStreamMetadata]
}
- override def createChangelogStream(streamName: String, numOfPartitions: Int) = ???
+ override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) {
+ new UnsupportedOperationException("Method not implemented.")
+ }
+
+ override def createCoordinatorStream(streamName: String) {
+ new UnsupportedOperationException("Method not implemented.")
+ }
}
[2/4] samza git commit: SAMZA-465: Use coordinator stream and
eliminate CheckpointManager
Posted by ni...@apache.org.
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
deleted file mode 100644
index 10ff1f4..0000000
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/file/TestFileSystemCheckpointManager.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.checkpoint.file
-
-import java.io.File
-import scala.collection.JavaConversions._
-import java.util.Random
-import org.junit.Assert._
-import org.junit.{After, Before, Test}
-import org.apache.samza.SamzaException
-import org.apache.samza.Partition
-import org.apache.samza.checkpoint.Checkpoint
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.container.TaskName
-import org.junit.rules.TemporaryFolder
-
-class TestFileSystemCheckpointManager {
- val checkpointRoot = System.getProperty("java.io.tmpdir") // TODO: Move this out of tmp, into our build dir
- val taskName = new TaskName("Warwickshire")
- val baseFileLocation = new File(checkpointRoot)
-
- val tempFolder = new TemporaryFolder
-
- @Before
- def createTempFolder = tempFolder.create()
-
- @After
- def deleteTempFolder = tempFolder.delete()
-
- @Test
- def testReadForCheckpointFileThatDoesNotExistShouldReturnNull {
- val cpm = new FileSystemCheckpointManager("some-job-name", tempFolder.getRoot)
- assertNull(cpm.readLastCheckpoint(taskName))
- }
-
- @Test
- def testReadForCheckpointFileThatDoesExistShouldReturnProperCheckpoint {
- val cp = new Checkpoint(Map(
- new SystemStreamPartition("a", "b", new Partition(0)) -> "c",
- new SystemStreamPartition("a", "c", new Partition(1)) -> "d",
- new SystemStreamPartition("b", "d", new Partition(2)) -> "e"))
-
- var readCp:Checkpoint = null
- val cpm = new FileSystemCheckpointManager("some-job-name", tempFolder.getRoot)
-
- cpm.start
- cpm.writeCheckpoint(taskName, cp)
- readCp = cpm.readLastCheckpoint(taskName)
- cpm.stop
-
- assertNotNull(readCp)
- cp.equals(readCp)
- assertEquals(cp.getOffsets.keySet(), readCp.getOffsets.keySet())
- assertEquals(cp.getOffsets, readCp.getOffsets)
- assertEquals(cp, readCp)
- }
-
- @Test
- def testMissingRootDirectoryShouldFailOnManagerCreation {
- val cpm = new FileSystemCheckpointManager("some-job-name", new File(checkpointRoot + new Random().nextInt))
- try {
- cpm.start
- fail("Expected an exception since root directory for fs checkpoint manager doesn't exist.")
- } catch {
- case e: SamzaException => None // this is expected
- }
- cpm.stop
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index cab31ca..58d7fe8 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -19,6 +19,7 @@
package org.apache.samza.container
+import java.util
import scala.collection.JavaConversions._
import org.apache.samza.Partition
import org.apache.samza.config.Config
@@ -29,7 +30,6 @@ import org.apache.samza.coordinator.server.JobServlet
import org.apache.samza.job.model.ContainerModel
import org.apache.samza.job.model.JobModel
import org.apache.samza.job.model.TaskModel
-import org.apache.samza.metrics.JmxServer
import org.apache.samza.serializers.SerdeManager
import org.apache.samza.system.IncomingMessageEnvelope
import org.apache.samza.system.StreamMetadataCache
@@ -54,21 +54,25 @@ import org.scalatest.junit.AssertionsForJUnit
import java.lang.Thread.UncaughtExceptionHandler
import org.apache.samza.serializers._
import org.apache.samza.SamzaException
+import org.apache.samza.checkpoint.CheckpointManager
class TestSamzaContainer extends AssertionsForJUnit {
@Test
def testReadJobModel {
val config = new MapConfig(Map("a" -> "b"))
+ val offsets = new util.HashMap[SystemStreamPartition, String]();
+ offsets.put(new SystemStreamPartition("system","stream", new Partition(0)), "1")
val tasks = Map(
- new TaskName("t1") -> new TaskModel(new TaskName("t1"), Set[SystemStreamPartition](), new Partition(0)),
- new TaskName("t2") -> new TaskModel(new TaskName("t2"), Set[SystemStreamPartition](), new Partition(0)))
+ new TaskName("t1") -> new TaskModel(new TaskName("t1"), offsets, new Partition(0)),
+ new TaskName("t2") -> new TaskModel(new TaskName("t2"), offsets, new Partition(0)))
val containers = Map(
Integer.valueOf(0) -> new ContainerModel(0, tasks),
Integer.valueOf(1) -> new ContainerModel(1, tasks))
val jobModel = new JobModel(config, containers)
+ def jobModelGenerator(): JobModel = jobModel
val server = new HttpServer
- val coordinator = new JobCoordinator(jobModel, server)
- coordinator.server.addServlet("/*", new JobServlet(jobModel))
+ val coordinator = new JobCoordinator(jobModel, server, new MockCheckpointManager)
+ coordinator.server.addServlet("/*", new JobServlet(jobModelGenerator))
try {
coordinator.start
assertEquals(jobModel, SamzaContainer.readJobModel(server.getUrl.toString))
@@ -180,7 +184,7 @@ class TestSamzaContainer extends AssertionsForJUnit {
val config = new MapConfig
assertTrue(defaultSerdesFromSerdeName("byte", "testSystemException", config).isInstanceOf[ByteSerde])
assertTrue(defaultSerdesFromSerdeName("integer", "testSystemException", config).isInstanceOf[IntegerSerde])
- assertTrue(defaultSerdesFromSerdeName("json", "testSystemException", config).isInstanceOf[JsonSerde])
+ assertTrue(defaultSerdesFromSerdeName("json", "testSystemException", config).isInstanceOf[JsonSerde[Object]])
assertTrue(defaultSerdesFromSerdeName("long", "testSystemException", config).isInstanceOf[LongSerde])
assertTrue(defaultSerdesFromSerdeName("serializable", "testSystemException", config).isInstanceOf[SerializableSerde[java.io.Serializable @unchecked]])
assertTrue(defaultSerdesFromSerdeName("string", "testSystemException", config).isInstanceOf[StringSerde])
@@ -196,3 +200,8 @@ class TestSamzaContainer extends AssertionsForJUnit {
assertTrue(throwSamzaException)
}
}
+
+class MockCheckpointManager extends CheckpointManager(null, null) {
+ override def start() = {}
+ override def stop() = {}
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala b/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
index 1ee5c06..2c7cb28 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
@@ -68,6 +68,6 @@ class TestGroupByContainerCount {
}
private def getTaskModel(name: String, partitionId: Int) = {
- new TaskModel(new TaskName(name), Set[SystemStreamPartition](), new Partition(partitionId))
+ new TaskModel(new TaskName(name), Map[SystemStreamPartition, String](), new Partition(partitionId))
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
index a8e5d36..d9ae187 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/TestJobCoordinator.scala
@@ -19,17 +19,19 @@
package org.apache.samza.coordinator
-import org.junit.Test
+
+import java.net.SocketTimeoutException
+
+import org.apache.samza.util.Util
+import org.junit.{After, Test}
import org.junit.Assert._
+import org.junit.rules.ExpectedException
import scala.collection.JavaConversions._
import org.apache.samza.config.MapConfig
import org.apache.samza.config.TaskConfig
import org.apache.samza.config.SystemConfig
-import org.apache.samza.container.TaskName
-import org.apache.samza.checkpoint.Checkpoint
-import org.apache.samza.checkpoint.CheckpointManagerFactory
-import org.apache.samza.checkpoint.CheckpointManager
-import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.container.{SamzaContainer, TaskName}
+import org.apache.samza.metrics.{MetricsRegistryMap, MetricsRegistry}
import org.apache.samza.config.Config
import org.apache.samza.system.SystemFactory
import org.apache.samza.system.SystemAdmin
@@ -40,67 +42,209 @@ import org.apache.samza.Partition
import org.apache.samza.job.model.JobModel
import org.apache.samza.job.model.ContainerModel
import org.apache.samza.job.model.TaskModel
+import org.apache.samza.config.JobConfig
+import org.apache.samza.system.IncomingMessageEnvelope
+import org.apache.samza.system.SystemConsumer
+import org.apache.samza.coordinator.stream.{MockCoordinatorStreamWrappedConsumer, MockCoordinatorStreamSystemFactory}
class TestJobCoordinator {
/**
- * Builds a coordinator from config, and then compares it with what was
- * expected. We simulate having a checkpoint manager that has 2 task
- * changelog entries, and our model adds a third task. Expectation is that
- * the JobCoordinator will assign the new task with a new changelog
- * partition.
+ * Builds a coordinator from config, and then compares it with what was
+ * expected. We simulate having a checkpoint manager that has 2 task
+ * changelog entries, and our model adds a third task. Expectation is that
+ * the JobCoordinator will assign the new task with a new changelog
+ * partition
*/
@Test
def testJobCoordinator {
- val containerCount = 2
- val config = new MapConfig(Map(
- TaskConfig.CHECKPOINT_MANAGER_FACTORY -> classOf[MockCheckpointManagerFactory].getCanonicalName,
- TaskConfig.INPUT_STREAMS -> "test.stream1",
- (SystemConfig.SYSTEM_FACTORY format "test") -> classOf[MockSystemFactory].getCanonicalName))
- val coordinator = JobCoordinator(config, containerCount)
+ val task0Name = new TaskName("Partition 0")
+ val checkpoint0 = Map(new SystemStreamPartition("test", "stream1", new Partition(0)) -> "4")
+ val task1Name = new TaskName("Partition 1")
+ val checkpoint1 = Map(new SystemStreamPartition("test", "stream1", new Partition(1)) -> "3")
+ val task2Name = new TaskName("Partition 2")
+ val checkpoint2 = Map(new SystemStreamPartition("test", "stream1", new Partition(2)) -> null)
- // Construct the expected JobModel, so we can compare it to
+ // Construct the expected JobModel, so we can compare it to
// JobCoordinator's JobModel.
+ val container0Tasks = Map(
+ task0Name -> new TaskModel(task0Name, checkpoint0, new Partition(4)),
+ task2Name -> new TaskModel(task2Name, checkpoint2, new Partition(5)))
+ val container1Tasks = Map(
+ task1Name -> new TaskModel(task1Name, checkpoint1, new Partition(3)))
+ val containers = Map(
+ Integer.valueOf(0) -> new ContainerModel(0, container0Tasks),
+ Integer.valueOf(1) -> new ContainerModel(1, container1Tasks))
+
+
+ // The test does not pass offsets for task2 (Partition 2) to the checkpointmanager, this will verify that we get an offset 0 for this partition
+ val checkpointOffset0 = MockCoordinatorStreamWrappedConsumer.CHECKPOINTPREFIX + "mock:" +
+ task0Name.getTaskName() -> (Util.sspToString(checkpoint0.keySet.iterator.next()) + ":" + checkpoint0.values.iterator.next())
+ val checkpointOffset1 = MockCoordinatorStreamWrappedConsumer.CHECKPOINTPREFIX + "mock:" +
+ task1Name.getTaskName() -> (Util.sspToString(checkpoint1.keySet.iterator.next()) + ":" + checkpoint1.values.iterator.next())
+ val changelogInfo0 = MockCoordinatorStreamWrappedConsumer.CHANGELOGPREFIX + "mock:" + task0Name.getTaskName() -> "4"
+ val changelogInfo1 = MockCoordinatorStreamWrappedConsumer.CHANGELOGPREFIX + "mock:" + task1Name.getTaskName() -> "3"
+ val changelogInfo2 = MockCoordinatorStreamWrappedConsumer.CHANGELOGPREFIX + "mock:" + task2Name.getTaskName() -> "5"
+
+ // Configs which are processed by the MockCoordinatorStream as special configs which are interpreted as
+ // SetCheckpoint and SetChangelog
+ val otherConfigs = Map(
+ checkpointOffset0,
+ checkpointOffset1,
+ changelogInfo0,
+ changelogInfo1,
+ changelogInfo2
+ )
+
+ val config = Map(
+ JobConfig.JOB_NAME -> "test",
+ JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
+ JobConfig.JOB_CONTAINER_COUNT -> "2",
+ TaskConfig.INPUT_STREAMS -> "test.stream1",
+ SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getCanonicalName,
+ SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName
+ )
+
+ // We want the mocksystemconsumer to use the same instance across runs
+ MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
+
+ val coordinator = JobCoordinator(new MapConfig(config ++ otherConfigs))
+ coordinator.start
+ val jobModel = new JobModel(new MapConfig(config), containers)
+ assertEquals(new MapConfig(config), coordinator.jobModel.getConfig)
+ assertEquals(jobModel, coordinator.jobModel)
+ }
+
+ @Test
+ def testJobCoordinatorCheckpointing = {
+ System.out.println("test ")
val task0Name = new TaskName("Partition 0")
+ val checkpoint0 = Map(new SystemStreamPartition("test", "stream1", new Partition(0)) -> "4")
val task1Name = new TaskName("Partition 1")
+ val checkpoint1 = Map(new SystemStreamPartition("test", "stream1", new Partition(1)) ->"3")
val task2Name = new TaskName("Partition 2")
+ val checkpoint2 = Map(new SystemStreamPartition("test", "stream1", new Partition(2)) -> "8")
+
+ // Construct the expected JobModel, so we can compare it to
+ // JobCoordinator's JobModel.
val container0Tasks = Map(
- task0Name -> new TaskModel(task0Name, Set(new SystemStreamPartition("test", "stream1", new Partition(0))), new Partition(4)),
- task2Name -> new TaskModel(task2Name, Set(new SystemStreamPartition("test", "stream1", new Partition(2))), new Partition(5)))
+ task0Name -> new TaskModel(task0Name, checkpoint0, new Partition(4)),
+ task2Name -> new TaskModel(task2Name, checkpoint2, new Partition(5)))
val container1Tasks = Map(
- task1Name -> new TaskModel(task1Name, Set(new SystemStreamPartition("test", "stream1", new Partition(1))), new Partition(3)))
+ task1Name -> new TaskModel(task1Name, checkpoint1, new Partition(3)))
val containers = Map(
Integer.valueOf(0) -> new ContainerModel(0, container0Tasks),
Integer.valueOf(1) -> new ContainerModel(1, container1Tasks))
- val jobModel = new JobModel(config, containers)
- assertEquals(config, coordinator.jobModel.getConfig)
- assertEquals(jobModel, coordinator.jobModel)
+
+
+ // The test does not pass offsets for task2 (Partition 2) to the checkpointmanager, this will verify that we get an offset 0 for this partition
+ val checkpointOffset0 = MockCoordinatorStreamWrappedConsumer.CHECKPOINTPREFIX + "mock:" +
+ task0Name.getTaskName() -> (Util.sspToString(checkpoint0.keySet.iterator.next()) + ":" + checkpoint0.values.iterator.next())
+ val checkpointOffset1 = MockCoordinatorStreamWrappedConsumer.CHECKPOINTPREFIX + "mock:" +
+ task1Name.getTaskName() -> (Util.sspToString(checkpoint1.keySet.iterator.next()) + ":" + checkpoint1.values.iterator.next())
+ val checkpointOffset2 = MockCoordinatorStreamWrappedConsumer.CHECKPOINTPREFIX + "mock:" +
+ task2Name.getTaskName() -> (Util.sspToString(checkpoint2.keySet.iterator.next()) + ":" + checkpoint2.values.iterator.next())
+ val changelogInfo0 = MockCoordinatorStreamWrappedConsumer.CHANGELOGPREFIX + "mock:" + task0Name.getTaskName() -> "4"
+ val changelogInfo1 = MockCoordinatorStreamWrappedConsumer.CHANGELOGPREFIX + "mock:" + task1Name.getTaskName() -> "3"
+ val changelogInfo2 = MockCoordinatorStreamWrappedConsumer.CHANGELOGPREFIX + "mock:" + task2Name.getTaskName() -> "5"
+
+ // Configs which are processed by the MockCoordinatorStream as special configs which are interpreted as
+ // SetCheckpoint and SetChangelog
+ // Write a couple of checkpoints that the job coordinator will process
+ val otherConfigs = Map(
+ checkpointOffset0,
+ changelogInfo0
+ )
+
+ val config = Map(
+ JobConfig.JOB_NAME -> "test",
+ JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
+ JobConfig.JOB_CONTAINER_COUNT -> "2",
+ TaskConfig.INPUT_STREAMS -> "test.stream1",
+ SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getCanonicalName,
+ SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName
+ )
+
+ // Enable caching on MockConsumer to add more messages later
+ MockCoordinatorStreamSystemFactory.enableMockConsumerCache()
+
+ // start the job coordinator and verify if it has all the checkpoints through http port
+ val coordinator = JobCoordinator(new MapConfig(config ++ otherConfigs))
+ coordinator.start
+ val url = coordinator.server.getUrl.toString
+
+ // Verify if the jobCoordinator has seen the checkpoints
+ var offsets = extractOffsetsFromJobCoordinator(url)
+ assertEquals(1, offsets.size)
+ assertEquals(checkpoint0.head._2, offsets.getOrElse(checkpoint0.head._1, fail()))
+
+ // Write more checkpoints
+ val wrappedConsumer = new MockCoordinatorStreamSystemFactory()
+ .getConsumer(null, null, null)
+ .asInstanceOf[MockCoordinatorStreamWrappedConsumer]
+
+ var moreMessageConfigs = Map(
+ checkpointOffset1
+ )
+
+ wrappedConsumer.addMoreMessages(new MapConfig(moreMessageConfigs))
+
+ // Verify if the coordinator has seen it
+ offsets = extractOffsetsFromJobCoordinator(url)
+ assertEquals(2, offsets.size)
+ assertEquals(checkpoint0.head._2, offsets.getOrElse(checkpoint0.head._1, fail()))
+ assertEquals(checkpoint1.head._2, offsets.getOrElse(checkpoint1.head._1, fail()))
+
+ // Write more checkpoints but block on read on the mock consumer
+ moreMessageConfigs = Map(
+ checkpointOffset2
+ )
+
+ wrappedConsumer.addMoreMessages(new MapConfig(moreMessageConfigs))
+
+ // Simulate consumer being blocked (Job coordinator waiting to read new checkpoints from coordinator after container failure)
+ val latch = wrappedConsumer.blockPool();
+
+ // verify if the port times out
+ var seenException = false
+ try {
+ extractOffsetsFromJobCoordinator(url)
+ }
+ catch {
+ case se: SocketTimeoutException => seenException = true
+ }
+ assertTrue(seenException)
+
+ // verify if it has read the new checkpoints after job coordinator has loaded the new checkpoints
+ latch.countDown()
+ offsets = extractOffsetsFromJobCoordinator(url)
+ assertEquals(offsets.size, 3)
+ assertEquals(checkpoint0.head._2, offsets.getOrElse(checkpoint0.head._1, fail()))
+ assertEquals(checkpoint1.head._2, offsets.getOrElse(checkpoint1.head._1, fail()))
+ assertEquals(checkpoint2.head._2, offsets.getOrElse(checkpoint2.head._1, fail()))
+ coordinator.stop
}
-}
-object MockCheckpointManager {
- var mapping: java.util.Map[TaskName, java.lang.Integer] = Map[TaskName, java.lang.Integer](
- new TaskName("Partition 0") -> 4,
- new TaskName("Partition 1") -> 3)
-}
+ def extractOffsetsFromJobCoordinator(url : String) = {
+ val jobModel = SamzaContainer.readJobModel(url.toString)
+ val taskModels = jobModel.getContainers.values().flatMap(_.getTasks.values())
+ val offsets = taskModels.flatMap(_.getCheckpointedOffsets).toMap
+ offsets.filter(_._2 != null)
+ }
-class MockCheckpointManagerFactory extends CheckpointManagerFactory {
- def getCheckpointManager(config: Config, registry: MetricsRegistry) = new MockCheckpointManager
-}
-class MockCheckpointManager extends CheckpointManager {
- def start() {}
- def register(taskName: TaskName) {}
- def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) {}
- def readLastCheckpoint(taskName: TaskName) = null
- def readChangeLogPartitionMapping = MockCheckpointManager.mapping
- def writeChangeLogPartitionMapping(mapping: java.util.Map[TaskName, java.lang.Integer]) {
- MockCheckpointManager.mapping = mapping
+ @After
+ def tearDown() = {
+ MockCoordinatorStreamSystemFactory.disableMockConsumerCache()
}
- def stop() {}
}
class MockSystemFactory extends SystemFactory {
- def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = null
+ def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = new SystemConsumer {
+ def start() {}
+ def stop() {}
+ def register(systemStreamPartition: SystemStreamPartition, offset: String) {}
+ def poll(systemStreamPartitions: java.util.Set[SystemStreamPartition], timeout: Long) = new java.util.HashMap[SystemStreamPartition, java.util.List[IncomingMessageEnvelope]]()
+ }
def getProducer(systemName: String, config: Config, registry: MetricsRegistry) = null
def getAdmin(systemName: String, config: Config) = new MockSystemAdmin
}
@@ -117,5 +261,11 @@ class MockSystemAdmin extends SystemAdmin {
Map(streamNames.toList.head -> new SystemStreamMetadata("foo", partitionMetadata))
}
- override def createChangelogStream(streamName: String, numOfPartitions: Int) = ???
+ override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) {
+ new UnsupportedOperationException("Method not implemented.")
+ }
+
+ override def createCoordinatorStream(streamName: String) {
+ new UnsupportedOperationException("Method not implemented.")
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala b/samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala
index 9a8e145..6ce8aa9 100644
--- a/samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/coordinator/server/TestHttpServer.scala
@@ -19,9 +19,9 @@
package org.apache.samza.coordinator.server
+import org.apache.samza.util.Util
import org.junit.Assert._
import org.junit.Test
-import org.apache.samza.util.Util
import java.net.URL
class TestHttpServer {
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala b/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
index 3a710a8..a1efe6f 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/local/TestProcessJob.scala
@@ -56,7 +56,7 @@ class TestProcessJob {
}
}
-class MockJobCoordinator extends JobCoordinator(null, null) {
+class MockJobCoordinator extends JobCoordinator(null, null, null) {
var stopped: Boolean = false
override def start: Unit = { }
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/scala/org/apache/samza/serializers/TestJsonSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestJsonSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestJsonSerde.scala
index 6046071..4f1c14c 100644
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestJsonSerde.scala
+++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestJsonSerde.scala
@@ -23,11 +23,13 @@ import org.junit.Assert._
import org.junit.Test
import scala.collection.JavaConversions._
+import scala.collection.immutable.HashMap
+
class TestJsonSerde {
@Test
def testJsonSerdeShouldWork {
- val serde = new JsonSerde
+ val serde = new JsonSerde[java.util.HashMap[String, Object]]
val obj = new java.util.HashMap[String, Object](Map[String, Object]("hi" -> "bye", "why" -> new java.lang.Integer(2)))
val bytes = serde.toBytes(obj)
assertEquals(obj, serde.fromBytes(bytes))
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala
deleted file mode 100644
index 5d8ee4f..0000000
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointLogKey.scala
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.checkpoint.kafka
-
-import java.util
-import org.apache.samza.SamzaException
-import org.apache.samza.container.TaskName
-import org.codehaus.jackson.`type`.TypeReference
-import org.codehaus.jackson.map.ObjectMapper
-import scala.collection.JavaConversions._
-
-/**
- * Kafka Checkpoint Log-specific key used to identify what type of entry is
- * written for any particular log entry.
- *
- * @param map Backing map to hold key values
- */
-class KafkaCheckpointLogKey private (val map: Map[String, String]) {
- // This might be better as a case class...
- import KafkaCheckpointLogKey._
-
- /**
- * Serialize this key to bytes
- * @return Key as bytes
- */
- def toBytes(): Array[Byte] = {
- val jMap = new util.HashMap[String, String](map.size)
- jMap.putAll(map)
-
- JSON_MAPPER.writeValueAsBytes(jMap)
- }
-
- private def getKey = map.getOrElse(CHECKPOINT_KEY_KEY, throw new SamzaException("No " + CHECKPOINT_KEY_KEY + " in map for Kafka Checkpoint log key"))
-
- /**
- * Is this key for a checkpoint entry?
- *
- * @return true iff this key's entry is for a checkpoint
- */
- def isCheckpointKey = getKey.equals(CHECKPOINT_KEY_TYPE)
-
- /**
- * Is this key for a changelog partition mapping?
- *
- * @return true iff this key's entry is for a changelog partition mapping
- */
- def isChangelogPartitionMapping = getKey.equals(CHANGELOG_PARTITION_KEY_TYPE)
-
- /**
- * If this Key is for a checkpoint entry, return its associated TaskName.
- *
- * @return TaskName for this checkpoint or throw an exception if this key does not have a TaskName entry
- */
- def getCheckpointTaskName = {
- val asString = map.getOrElse(CHECKPOINT_TASKNAME_KEY, throw new SamzaException("No TaskName in checkpoint key: " + this))
- new TaskName(asString)
- }
-
- def canEqual(other: Any): Boolean = other.isInstanceOf[KafkaCheckpointLogKey]
-
- override def equals(other: Any): Boolean = other match {
- case that: KafkaCheckpointLogKey =>
- (that canEqual this) &&
- map == that.map
- case _ => false
- }
-
- override def hashCode(): Int = {
- val state = Seq(map)
- state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
- }
-}
-
-object KafkaCheckpointLogKey {
- /**
- * Messages in the checkpoint log have keys associated with them. These keys are maps that describe the message's
- * type, either a checkpoint or a changelog-partition-mapping.
- */
- val CHECKPOINT_KEY_KEY = "type"
- val CHECKPOINT_KEY_TYPE = "checkpoint"
- val CHANGELOG_PARTITION_KEY_TYPE = "changelog-partition-mapping"
- val CHECKPOINT_TASKNAME_KEY = "taskName"
- val SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY = "systemstreampartition-grouper-factory"
-
- /**
- * Partition mapping keys have no dynamic values, so we just need one instance.
- */
- val CHANGELOG_PARTITION_MAPPING_KEY = new KafkaCheckpointLogKey(Map(CHECKPOINT_KEY_KEY -> CHANGELOG_PARTITION_KEY_TYPE))
-
- private val JSON_MAPPER = new ObjectMapper()
- val KEY_TYPEREFERENCE = new TypeReference[util.HashMap[String, String]]() {}
-
- var systemStreamPartitionGrouperFactoryString:Option[String] = None
-
- /**
- * Set the name of the factory configured to provide the SystemStreamPartition grouping
- * so it be included in the key.
- *
- * @param str Config value of SystemStreamPartition Grouper Factory
- */
- def setSystemStreamPartitionGrouperFactoryString(str:String) = {
- systemStreamPartitionGrouperFactoryString = Some(str)
- }
-
- /**
- * Get the name of the factory configured to provide the SystemStreamPartition grouping
- * so it be included in the key
- */
- def getSystemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString.getOrElse(throw new SamzaException("No SystemStreamPartition grouping factory string has been set."))
-
- /**
- * Build a key for a a checkpoint log entry for a particular TaskName
- * @param taskName TaskName to build for this checkpoint entry
- *
- * @return Key for checkpoint log entry
- */
- def getCheckpointKey(taskName:TaskName) = {
- val map = Map(CHECKPOINT_KEY_KEY -> CHECKPOINT_KEY_TYPE,
- CHECKPOINT_TASKNAME_KEY -> taskName.getTaskName,
- SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY -> getSystemStreamPartitionGrouperFactoryString)
-
- new KafkaCheckpointLogKey(map)
- }
-
- /**
- * Build a key for a changelog partition mapping entry
- *
- * @return Key for changelog partition mapping entry
- */
- def getChangelogPartitionMappingKey() = CHANGELOG_PARTITION_MAPPING_KEY
-
- /**
- * Deserialize a Kafka checkpoint log key
- * @param bytes Serialized (via JSON) Kafka checkpoint log key
- * @return Checkpoint log key
- */
- def fromBytes(bytes: Array[Byte]): KafkaCheckpointLogKey = {
- try {
- val jmap: util.HashMap[String, String] = JSON_MAPPER.readValue(bytes, KEY_TYPEREFERENCE)
-
- if(!jmap.containsKey(CHECKPOINT_KEY_KEY)) {
- throw new SamzaException("No type entry in checkpoint key: " + jmap)
- }
-
- // Only checkpoint keys have ssp grouper factory keys
- if(jmap.get(CHECKPOINT_KEY_KEY).equals(CHECKPOINT_KEY_TYPE)) {
- val sspGrouperFactory = jmap.get(SYSTEMSTREAMPARTITION_GROUPER_FACTORY_KEY)
-
- if (sspGrouperFactory == null) {
- throw new SamzaException("No SystemStreamPartition Grouper factory entry in checkpoint key: " + jmap)
- }
-
- if (!sspGrouperFactory.equals(getSystemStreamPartitionGrouperFactoryString)) {
- throw new DifferingSystemStreamPartitionGrouperFactoryValues(sspGrouperFactory, getSystemStreamPartitionGrouperFactoryString)
- }
- }
-
- new KafkaCheckpointLogKey(jmap.toMap)
- } catch {
- case e: Exception =>
- throw new SamzaException("Exception while deserializing checkpoint key", e)
- }
- }
-}
-
-class DifferingSystemStreamPartitionGrouperFactoryValues(inKey:String, inConfig:String) extends SamzaException {
- override def getMessage() = "Checkpoint key's SystemStreamPartition Grouper factory (" + inKey +
- ") does not match value from current configuration (" + inConfig + "). " +
- "This likely means the SystemStreamPartitionGrouper was changed between job runs, which is not supported."
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
deleted file mode 100644
index c9504ec..0000000
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ /dev/null
@@ -1,427 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.checkpoint.kafka
-
-import org.apache.samza.util.Logging
-import java.nio.ByteBuffer
-import java.util
-import kafka.admin.AdminUtils
-import kafka.api._
-import kafka.common.ErrorMapping
-import kafka.common.InvalidMessageSizeException
-import kafka.common.TopicAndPartition
-import kafka.common.TopicExistsException
-import kafka.common.UnknownTopicOrPartitionException
-import kafka.consumer.SimpleConsumer
-import kafka.message.InvalidMessageException
-import kafka.utils.Utils
-import org.I0Itec.zkclient.ZkClient
-import org.apache.samza.SamzaException
-import org.apache.samza.checkpoint.Checkpoint
-import org.apache.samza.checkpoint.CheckpointManager
-import org.apache.samza.container.TaskName
-import org.apache.samza.serializers.CheckpointSerde
-import org.apache.samza.system.kafka.TopicMetadataCache
-import org.apache.samza.util.ExponentialSleepStrategy
-import org.apache.samza.util.TopicMetadataStore
-import scala.collection.mutable
-import java.util.Properties
-import org.apache.kafka.clients.producer.{Producer, ProducerRecord}
-import org.apache.samza.util.KafkaUtil
-
-/**
- * Kafka checkpoint manager is used to store checkpoints in a Kafka topic.
- * To read a checkpoint for a specific taskName, we find the newest message
- * keyed to that taskName. If there is no such message, no checkpoint data
- * exists. The underlying log has a single partition into which all
- * checkpoints and TaskName to changelog partition mappings are written.
- */
-class KafkaCheckpointManager(
- clientId: String,
- checkpointTopic: String,
- systemName: String,
- replicationFactor: Int,
- socketTimeout: Int,
- bufferSize: Int,
- fetchSize: Int,
- metadataStore: TopicMetadataStore,
- connectProducer: () => Producer[Array[Byte], Array[Byte]],
- connectZk: () => ZkClient,
- systemStreamPartitionGrouperFactoryString: String,
- retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
- serde: CheckpointSerde = new CheckpointSerde,
- checkpointTopicProperties: Properties = new Properties) extends CheckpointManager with Logging {
- import KafkaCheckpointManager._
-
- var taskNames = Set[TaskName]()
- var producer: Producer[Array[Byte], Array[Byte]] = null
- var taskNamesToOffsets: Map[TaskName, Checkpoint] = null
-
- var startingOffset: Option[Long] = None // Where to start reading for each subsequent call of readCheckpoint
-
- KafkaCheckpointLogKey.setSystemStreamPartitionGrouperFactoryString(systemStreamPartitionGrouperFactoryString)
-
- info("Creating KafkaCheckpointManager with: clientId=%s, checkpointTopic=%s, systemName=%s" format(clientId, checkpointTopic, systemName))
-
- /**
- * Write Checkpoint for specified taskName to log
- *
- * @param taskName Specific Samza taskName of which to write a checkpoint of.
- * @param checkpoint Reference to a Checkpoint object to store offset data in.
- **/
- override def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) {
- val key = KafkaCheckpointLogKey.getCheckpointKey(taskName)
- val keyBytes = key.toBytes()
- val msgBytes = serde.toBytes(checkpoint)
-
- writeLog(CHECKPOINT_LOG4J_ENTRY, keyBytes, msgBytes)
- }
-
- /**
- * Write the taskName to partition mapping that is being maintained by this CheckpointManager
- *
- * @param changelogPartitionMapping Each TaskName's partition within the changelog
- */
- override def writeChangeLogPartitionMapping(changelogPartitionMapping: util.Map[TaskName, java.lang.Integer]) {
- val key = KafkaCheckpointLogKey.getChangelogPartitionMappingKey()
- val keyBytes = key.toBytes()
- val msgBytes = serde.changelogPartitionMappingToBytes(changelogPartitionMapping)
-
- writeLog(CHANGELOG_PARTITION_MAPPING_LOG4j, keyBytes, msgBytes)
- }
-
- /**
- * Common code for writing either checkpoints or changelog-partition-mappings to the log
- *
- * @param logType Type of entry that is being written, for logging
- * @param key pre-serialized key for message
- * @param msg pre-serialized message to write to log
- */
- private def writeLog(logType:String, key: Array[Byte], msg: Array[Byte]) {
- retryBackoff.run(
- loop => {
- if (producer == null) {
- producer = connectProducer()
- }
-
- producer.send(new ProducerRecord(checkpointTopic, 0, key, msg)).get()
- loop.done
- },
-
- (exception, loop) => {
- warn("Failed to write %s partition entry %s: %s. Retrying." format(logType, key, exception))
- debug("Exception detail:", exception)
- if (producer != null) {
- producer.close
- }
- producer = null
- }
- )
- }
-
- private def getConsumer(): SimpleConsumer = {
- val metadataMap = TopicMetadataCache.getTopicMetadata(Set(checkpointTopic), systemName, (topics: Set[String]) => metadataStore.getTopicInfo(topics))
- val metadata = metadataMap(checkpointTopic)
- val partitionMetadata = metadata.partitionsMetadata
- .filter(_.partitionId == 0)
- .headOption
- .getOrElse(throw new KafkaCheckpointException("Tried to find partition information for partition 0 for checkpoint topic, but it didn't exist in Kafka."))
- val leader = partitionMetadata
- .leader
- .getOrElse(throw new SamzaException("No leader available for topic %s" format checkpointTopic))
-
- info("Connecting to leader %s:%d for topic %s and to fetch all checkpoint messages." format(leader.host, leader.port, checkpointTopic))
-
- new SimpleConsumer(leader.host, leader.port, socketTimeout, bufferSize, clientId)
- }
-
- private def getEarliestOffset(consumer: SimpleConsumer, topicAndPartition: TopicAndPartition): Long = consumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, -1)
-
- private def getOffset(consumer: SimpleConsumer, topicAndPartition: TopicAndPartition, earliestOrLatest: Long): Long = {
- val offsetResponse = consumer.getOffsetsBefore(new OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1))))
- .partitionErrorAndOffsets
- .get(topicAndPartition)
- .getOrElse(throw new KafkaCheckpointException("Unable to find offset information for %s:0" format checkpointTopic))
- // Fail or retry if there was an an issue with the offset request.
- KafkaUtil.maybeThrowException(offsetResponse.error)
-
- val offset: Long = offsetResponse
- .offsets
- .headOption
- .getOrElse(throw new KafkaCheckpointException("Got response, but no offsets defined for %s:0" format checkpointTopic))
-
- offset
- }
-
- /**
- * Read the last checkpoint for specified TaskName
- *
- * @param taskName Specific Samza taskName for which to get the last checkpoint of.
- **/
- override def readLastCheckpoint(taskName: TaskName): Checkpoint = {
- if (!taskNames.contains(taskName)) {
- throw new SamzaException(taskName + " not registered with this CheckpointManager")
- }
-
- info("Reading checkpoint for taskName " + taskName)
-
- if (taskNamesToOffsets == null) {
- info("No TaskName to checkpoint mapping provided. Reading for first time.")
- taskNamesToOffsets = readCheckpointsFromLog()
- } else {
- info("Already existing checkpoint mapping. Merging new offsets")
- taskNamesToOffsets ++= readCheckpointsFromLog()
- }
-
- val checkpoint = taskNamesToOffsets.get(taskName).getOrElse(null)
-
- info("Got checkpoint state for taskName %s: %s" format(taskName, checkpoint))
-
- checkpoint
- }
-
- /**
- * Read through entire log, discarding changelog mapping, and building map of TaskNames to Checkpoints
- */
- private def readCheckpointsFromLog(): Map[TaskName, Checkpoint] = {
- val checkpoints = mutable.Map[TaskName, Checkpoint]()
-
- def shouldHandleEntry(key: KafkaCheckpointLogKey) = key.isCheckpointKey
-
- def handleCheckpoint(payload: ByteBuffer, checkpointKey:KafkaCheckpointLogKey): Unit = {
- val taskName = checkpointKey.getCheckpointTaskName
-
- if (taskNames.contains(taskName)) {
- val checkpoint = serde.fromBytes(Utils.readBytes(payload))
-
- debug("Adding checkpoint " + checkpoint + " for taskName " + taskName)
-
- checkpoints.put(taskName, checkpoint) // replacing any existing, older checkpoints as we go
- }
- }
-
- readLog(CHECKPOINT_LOG4J_ENTRY, shouldHandleEntry, handleCheckpoint)
-
- checkpoints.toMap /* of the immutable kind */
- }
-
- /**
- * Read through entire log, discarding checkpoints, finding latest changelogPartitionMapping
- *
- * Lots of duplicated code from the checkpoint method, but will be better to refactor this code into AM-based
- * checkpoint log reading
- */
- override def readChangeLogPartitionMapping(): util.Map[TaskName, java.lang.Integer] = {
- var changelogPartitionMapping: util.Map[TaskName, java.lang.Integer] = new util.HashMap[TaskName, java.lang.Integer]()
-
- def shouldHandleEntry(key: KafkaCheckpointLogKey) = key.isChangelogPartitionMapping
-
- def handleCheckpoint(payload: ByteBuffer, checkpointKey:KafkaCheckpointLogKey): Unit = {
- changelogPartitionMapping = serde.changelogPartitionMappingFromBytes(Utils.readBytes(payload))
-
- debug("Adding changelog partition mapping" + changelogPartitionMapping)
- }
-
- readLog(CHANGELOG_PARTITION_MAPPING_LOG4j, shouldHandleEntry, handleCheckpoint)
-
- changelogPartitionMapping
- }
-
- /**
- * Common code for reading both changelog partition mapping and change log
- *
- * @param entryType What type of entry to look for within the log key's
- * @param handleEntry Code to handle an entry in the log once it's found
- */
- private def readLog(entryType:String, shouldHandleEntry: (KafkaCheckpointLogKey) => Boolean,
- handleEntry: (ByteBuffer, KafkaCheckpointLogKey) => Unit): Unit = {
- retryBackoff.run[Unit](
- loop => {
- val consumer = getConsumer()
-
- val topicAndPartition = new TopicAndPartition(checkpointTopic, 0)
-
- try {
- var offset = startingOffset.getOrElse(getEarliestOffset(consumer, topicAndPartition))
-
- info("Got offset %s for topic %s and partition 0. Attempting to fetch messages for %s." format(offset, checkpointTopic, entryType))
-
- val latestOffset = getOffset(consumer, topicAndPartition, OffsetRequest.LatestTime)
-
- info("Get latest offset %s for topic %s and partition 0." format(latestOffset, checkpointTopic))
-
- if (offset < 0) {
- info("Got offset 0 (no messages in %s) for topic %s and partition 0, so returning empty collection. If you expected the checkpoint topic to have messages, you're probably going to lose data." format (entryType, checkpointTopic))
- return
- }
-
- while (offset < latestOffset) {
- val request = new FetchRequestBuilder()
- .addFetch(checkpointTopic, 0, offset, fetchSize)
- .maxWait(500)
- .minBytes(1)
- .clientId(clientId)
- .build
-
- val fetchResponse = consumer.fetch(request)
- if (fetchResponse.hasError) {
- warn("Got error code from broker for %s: %s" format(checkpointTopic, fetchResponse.errorCode(checkpointTopic, 0)))
- val errorCode = fetchResponse.errorCode(checkpointTopic, 0)
- if (ErrorMapping.OffsetOutOfRangeCode.equals(errorCode)) {
- warn("Got an offset out of range exception while getting last entry in %s for topic %s and partition 0, so returning a null offset to the KafkaConsumer. Let it decide what to do based on its autooffset.reset setting." format (entryType, checkpointTopic))
- return
- }
- KafkaUtil.maybeThrowException(errorCode)
- }
-
- for (response <- fetchResponse.messageSet(checkpointTopic, 0)) {
- offset = response.nextOffset
- startingOffset = Some(offset) // For next time we call
-
- if (!response.message.hasKey) {
- throw new KafkaCheckpointException("Encountered message without key.")
- }
-
- val checkpointKey = KafkaCheckpointLogKey.fromBytes(Utils.readBytes(response.message.key))
-
- if (!shouldHandleEntry(checkpointKey)) {
- debug("Skipping " + entryType + " entry with key " + checkpointKey)
- } else {
- handleEntry(response.message.payload, checkpointKey)
- }
- }
- }
- } finally {
- consumer.close()
- }
-
- loop.done
- Unit
- },
-
- (exception, loop) => {
- exception match {
- case e: InvalidMessageException => throw new KafkaCheckpointException("Got InvalidMessageException from Kafka, which is unrecoverable, so fail the samza job", e)
- case e: InvalidMessageSizeException => throw new KafkaCheckpointException("Got InvalidMessageSizeException from Kafka, which is unrecoverable, so fail the samza job", e)
- case e: UnknownTopicOrPartitionException => throw new KafkaCheckpointException("Got UnknownTopicOrPartitionException from Kafka, which is unrecoverable, so fail the samza job", e)
- case e: KafkaCheckpointException => throw e
- case e: Exception =>
- warn("While trying to read last %s entry for topic %s and partition 0: %s. Retrying." format(entryType, checkpointTopic, e))
- debug("Exception detail:", e)
- }
- }
- ).getOrElse(throw new SamzaException("Failed to get entries for " + entryType + " from topic " + checkpointTopic))
-
- }
-
- def start {
- create
- validateTopic
- }
-
- def register(taskName: TaskName) {
- debug("Adding taskName " + taskName + " to " + this)
- taskNames += taskName
- }
-
- def stop = {
- if (producer != null) {
- producer.close
- }
- }
-
- def create {
- info("Attempting to create checkpoint topic %s." format checkpointTopic)
- retryBackoff.run(
- loop => {
- val zkClient = connectZk()
- try {
- AdminUtils.createTopic(
- zkClient,
- checkpointTopic,
- 1,
- replicationFactor,
- checkpointTopicProperties)
- } finally {
- zkClient.close
- }
-
- info("Created checkpoint topic %s." format checkpointTopic)
- loop.done
- },
-
- (exception, loop) => {
- exception match {
- case e: TopicExistsException =>
- info("Checkpoint topic %s already exists." format checkpointTopic)
- loop.done
- case e: Exception =>
- warn("Failed to create topic %s: %s. Retrying." format(checkpointTopic, e))
- debug("Exception detail:", e)
- }
- }
- )
- }
-
- private def validateTopic {
- info("Validating checkpoint topic %s." format checkpointTopic)
- retryBackoff.run(
- loop => {
- val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(checkpointTopic), systemName, metadataStore.getTopicInfo)
- val topicMetadata = topicMetadataMap(checkpointTopic)
- KafkaUtil.maybeThrowException(topicMetadata.errorCode)
-
- val partitionCount = topicMetadata.partitionsMetadata.length
- if (partitionCount != 1) {
- throw new KafkaCheckpointException("Checkpoint topic validation failed for topic %s because partition count %s did not match expected partition count of 1." format(checkpointTopic, topicMetadata.partitionsMetadata.length))
- }
-
- info("Successfully validated checkpoint topic %s." format checkpointTopic)
- loop.done
- },
-
- (exception, loop) => {
- exception match {
- case e: KafkaCheckpointException => throw e
- case e: Exception =>
- warn("While trying to validate topic %s: %s. Retrying." format(checkpointTopic, e))
- debug("Exception detail:", e)
- }
- }
- )
- }
-
- override def toString = "KafkaCheckpointManager [systemName=%s, checkpointTopic=%s]" format(systemName, checkpointTopic)
-}
-
-object KafkaCheckpointManager {
- val CHECKPOINT_LOG4J_ENTRY = "checkpoint log"
- val CHANGELOG_PARTITION_MAPPING_LOG4j = "changelog partition mapping"
-}
-
-/**
- * KafkaCheckpointManager handles retries, so we need two kinds of exceptions:
- * one to signal a hard failure, and the other to retry. The
- * KafkaCheckpointException is thrown to indicate a hard failure that the Kafka
- * CheckpointManager can't recover from.
- */
-class KafkaCheckpointException(s: String, t: Throwable) extends SamzaException(s, t) {
- def this(s: String) = this(s, null)
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
deleted file mode 100644
index 3dfa26a..0000000
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManagerFactory.scala
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.checkpoint.kafka
-
-import org.apache.samza.util.Logging
-import kafka.utils.ZKStringSerializer
-import org.I0Itec.zkclient.ZkClient
-import org.apache.samza.SamzaException
-import org.apache.samza.checkpoint.CheckpointManager
-import org.apache.samza.checkpoint.CheckpointManagerFactory
-import org.apache.samza.config.Config
-import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.KafkaConfig.Config2Kafka
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.util.{ KafkaUtil, ClientUtilTopicMetadataStore }
-import java.util.Properties
-import scala.collection.JavaConversions._
-import org.apache.kafka.clients.producer.KafkaProducer
-
-object KafkaCheckpointManagerFactory {
- /**
- * Version number to track the format of the checkpoint log
- */
- val CHECKPOINT_LOG_VERSION_NUMBER = 1
-
- val INJECTED_PRODUCER_PROPERTIES = Map(
- "acks" -> "all",
- // Forcibly disable compression because Kafka doesn't support compression
- // on log compacted topics. Details in SAMZA-586.
- "compression.type" -> "none")
-
- // Set the checkpoint topic configs to have a very small segment size and
- // enable log compaction. This keeps job startup time small since there
- // are fewer useless (overwritten) messages to read from the checkpoint
- // topic.
- def getCheckpointTopicProperties(config: Config) = {
- val segmentBytes = config
- .getCheckpointSegmentBytes
- .getOrElse("26214400")
-
- (new Properties /: Map(
- "cleanup.policy" -> "compact",
- "segment.bytes" -> segmentBytes)) { case (props, (k, v)) => props.put(k, v); props }
- }
-}
-
-class KafkaCheckpointManagerFactory extends CheckpointManagerFactory with Logging {
- import KafkaCheckpointManagerFactory._
-
- def getCheckpointManager(config: Config, registry: MetricsRegistry): CheckpointManager = {
- val clientId = KafkaUtil.getClientId("samza-checkpoint-manager", config)
- val systemName = config
- .getCheckpointSystem
- .getOrElse(throw new SamzaException("no system defined for Kafka's checkpoint manager."))
- val producerConfig = config.getKafkaSystemProducerConfig(
- systemName,
- clientId,
- INJECTED_PRODUCER_PROPERTIES)
- val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
- val replicationFactor = config.getCheckpointReplicationFactor.getOrElse("3").toInt
- val socketTimeout = consumerConfig.socketTimeoutMs
- val bufferSize = consumerConfig.socketReceiveBufferBytes
- val fetchSize = consumerConfig.fetchMessageMaxBytes // must be > buffer size
-
- val connectProducer = () => {
- new KafkaProducer[Array[Byte], Array[Byte]](producerConfig.getProducerProperties)
- }
- val zkConnect = Option(consumerConfig.zkConnect)
- .getOrElse(throw new SamzaException("no zookeeper.connect defined in config"))
- val connectZk = () => {
- new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
- }
- val jobName = config.getName.getOrElse(throw new SamzaException("Missing job name in configs"))
- val jobId = config.getJobId.getOrElse("1")
- val bootstrapServers = producerConfig.bootsrapServers
- val metadataStore = new ClientUtilTopicMetadataStore(bootstrapServers, clientId, socketTimeout)
- val checkpointTopic = getTopic(jobName, jobId)
-
- // Find out the SSPGrouperFactory class so it can be included/verified in the key
- val systemStreamPartitionGrouperFactoryString = config.getSystemStreamPartitionGrouperFactory
-
- new KafkaCheckpointManager(
- clientId,
- checkpointTopic,
- systemName,
- replicationFactor,
- socketTimeout,
- bufferSize,
- fetchSize,
- metadataStore,
- connectProducer,
- connectZk,
- systemStreamPartitionGrouperFactoryString,
- checkpointTopicProperties = getCheckpointTopicProperties(config))
- }
-
- private def getTopic(jobName: String, jobId: String) =
- "__samza_checkpoint_ver_%d_for_%s_%s" format (CHECKPOINT_LOG_VERSION_NUMBER, jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 02a6ea9..a1de887 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -24,7 +24,6 @@ import java.util.regex.Pattern
import org.apache.samza.util.Util
import org.apache.samza.util.Logging
-
import scala.collection.JavaConversions._
import kafka.consumer.ConsumerConfig
import java.util.{Properties, UUID}
@@ -41,10 +40,6 @@ object KafkaConfig {
val REGEX_RESOLVED_SYSTEM = "job.config.rewriter.%s.system"
val REGEX_INHERITED_CONFIG = "job.config.rewriter.%s.config"
- val CHECKPOINT_SYSTEM = "task.checkpoint.system"
- val CHECKPOINT_REPLICATION_FACTOR = "task.checkpoint.replication.factor"
- val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint.segment.bytes"
-
val CHANGELOG_STREAM_REPLICATION_FACTOR = "stores.%s.changelog.replication.factor"
val CHANGELOG_STREAM_KAFKA_SETTINGS = "stores.%s.changelog.kafka."
// The default segment size to use for changelog topics
@@ -63,11 +58,6 @@ object KafkaConfig {
}
class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
- // checkpoints
- def getCheckpointSystem = getOption(KafkaConfig.CHECKPOINT_SYSTEM)
- def getCheckpointReplicationFactor() = getOption(KafkaConfig.CHECKPOINT_REPLICATION_FACTOR)
- def getCheckpointSegmentBytes() = getOption(KafkaConfig.CHECKPOINT_SEGMENT_BYTES)
-
// custom consumer config
def getConsumerFetchThreshold(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD format name)
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala b/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
index a34c3f2..78467bf 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
@@ -23,12 +23,12 @@ import org.I0Itec.zkclient.ZkClient
import kafka.utils.{ ZkUtils, ZKStringSerializer }
import org.apache.samza.config.KafkaConfig.{ Config2Kafka, REGEX_RESOLVED_STREAMS }
import org.apache.samza.SamzaException
+import org.apache.samza.util.Util
import collection.JavaConversions._
import org.apache.samza.util.Logging
import scala.collection._
import org.apache.samza.config.TaskConfig.Config2Task
import org.apache.samza.system.SystemStream
-import org.apache.samza.util.Util
import scala.util.Sorting
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index f783c57..35086f5 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -94,6 +94,24 @@ class KafkaSystemAdmin(
brokerListString: String,
/**
+ * A method that returns a ZkClient for the Kafka system. This is invoked
+ * when the system admin is attempting to create a coordinator stream.
+ */
+ connectZk: () => ZkClient,
+
+ /**
+ * Custom properties to use when the system admin tries to create a new
+ * coordinator stream.
+ */
+ coordinatorStreamProperties: Properties = new Properties,
+
+ /**
+ * The replication factor to use when the system admin creates a new
+ * coordinator stream.
+ */
+ coordinatorStreamReplicationFactor: Int = 1,
+
+ /**
* The timeout to use for the simple consumer when fetching metadata from
* Kafka. Equivalent to Kafka's socket.timeout.ms configuration.
*/
@@ -113,12 +131,6 @@ class KafkaSystemAdmin(
clientId: String = UUID.randomUUID.toString,
/**
- * A function that returns a Zookeeper client to connect to fetch the meta
- * data information
- */
- connectZk: () => ZkClient,
-
- /**
* Replication factor for the Changelog topic in kafka
* Kafka properties to be used during the Changelog topic creation
*/
@@ -201,8 +213,39 @@ class KafkaSystemAdmin(
(exception, loop) => {
warn("Unable to fetch last offsets for streams %s due to %s. Retrying." format (streams, exception))
debug("Exception detail:", exception)
- }
- ).getOrElse(throw new SamzaException("Failed to get system stream metadata"))
+ }).getOrElse(throw new SamzaException("Failed to get system stream metadata"))
+ }
+
+ def createCoordinatorStream(streamName: String) {
+ info("Attempting to create coordinator stream %s." format streamName)
+ new ExponentialSleepStrategy(initialDelayMs = 500).run(
+ loop => {
+ val zkClient = connectZk()
+ try {
+ AdminUtils.createTopic(
+ zkClient,
+ streamName,
+ 1, // Always one partition for coordinator stream.
+ coordinatorStreamReplicationFactor,
+ coordinatorStreamProperties)
+ } finally {
+ zkClient.close
+ }
+
+ info("Created coordinator stream %s." format streamName)
+ loop.done
+ },
+
+ (exception, loop) => {
+ exception match {
+ case e: TopicExistsException =>
+ info("Coordinator stream %s already exists." format streamName)
+ loop.done
+ case e: Exception =>
+ warn("Failed to create topic %s: %s. Retrying." format (streamName, e))
+ debug("Exception detail:", e)
+ }
+ })
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index c84ceb7..1629035 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -19,10 +19,13 @@
package org.apache.samza.system.kafka
+import java.util.Properties
+import org.apache.samza.SamzaException
import org.apache.samza.util.{Logging, KafkaUtil, ExponentialSleepStrategy, ClientUtilTopicMetadataStore}
import org.apache.samza.config.Config
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.config.KafkaConfig.Config2Kafka
+import org.apache.samza.config.JobConfig.Config2Job
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.samza.system.SystemFactory
import org.apache.samza.config.StorageConfig._
@@ -98,8 +101,14 @@ class KafkaSystemFactory extends SystemFactory with Logging {
val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
val timeout = consumerConfig.socketTimeoutMs
val bufferSize = consumerConfig.socketReceiveBufferBytes
+ val zkConnect = Option(consumerConfig.zkConnect)
+ .getOrElse(throw new SamzaException("no zookeeper.connect defined in config"))
+ val connectZk = () => {
+ new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
+ }
+ val coordinatorStreamProperties = getCoordinatorTopicProperties(config)
+ val coordinatorStreamReplicationFactor = config.getCoordinatorReplicationFactor.toInt
val storeToChangelog = config.getKafkaChangelogEnabledStores()
-
// Construct the meta information for each topic, if the replication factor is not defined, we use 2 as the number of replicas for the change log stream.
val topicMetaInformation = storeToChangelog.map{case (storeName, topicName) =>
{
@@ -112,10 +121,20 @@ class KafkaSystemFactory extends SystemFactory with Logging {
new KafkaSystemAdmin(
systemName,
bootstrapServers,
+ connectZk,
+ coordinatorStreamProperties,
+ coordinatorStreamReplicationFactor,
timeout,
bufferSize,
clientId,
- () => new ZkClient(consumerConfig.zkConnect, 6000, 6000, ZKStringSerializer),
topicMetaInformation)
}
+
+ def getCoordinatorTopicProperties(config: Config) = {
+ val segmentBytes = config.getCoordinatorSegmentBytes
+ (new Properties /: Map(
+ "cleanup.policy" -> "compact",
+ "segment.bytes" -> segmentBytes)) { case (props, (k, v)) => props.put(k, v); props }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala
deleted file mode 100644
index b76d5ad..0000000
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointLogKey.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.checkpoint.kafka
-
-import org.apache.samza.SamzaException
-import org.apache.samza.container.TaskName
-import org.junit.Assert._
-import org.junit.{Before, Test}
-
-class TestKafkaCheckpointLogKey {
- @Before
- def setSSPGrouperFactoryString() {
- KafkaCheckpointLogKey.setSystemStreamPartitionGrouperFactoryString("hello")
- }
-
- @Test
- def checkpointKeySerializationRoundTrip() {
- val checkpointKey = KafkaCheckpointLogKey.getCheckpointKey(new TaskName("TN"))
- val asBytes = checkpointKey.toBytes()
- val backFromBytes = KafkaCheckpointLogKey.fromBytes(asBytes)
-
- assertEquals(checkpointKey, backFromBytes)
- assertNotSame(checkpointKey, backFromBytes)
- }
-
- @Test
- def changelogPartitionMappingKeySerializationRoundTrip() {
- val key = KafkaCheckpointLogKey.getChangelogPartitionMappingKey()
- val asBytes = key.toBytes()
- val backFromBytes = KafkaCheckpointLogKey.fromBytes(asBytes)
-
- assertEquals(key, backFromBytes)
- assertNotSame(key, backFromBytes)
- }
-
- @Test
- def differingSSPGrouperFactoriesCauseException() {
-
- val checkpointKey = KafkaCheckpointLogKey.getCheckpointKey(new TaskName("TN"))
-
- val asBytes = checkpointKey.toBytes()
-
- KafkaCheckpointLogKey.setSystemStreamPartitionGrouperFactoryString("goodbye")
-
- var gotException = false
- try {
- KafkaCheckpointLogKey.fromBytes(asBytes)
- } catch {
- case se:SamzaException => assertEquals(new DifferingSystemStreamPartitionGrouperFactoryValues("hello", "goodbye").getMessage(), se.getCause.getMessage)
- gotException = true
- }
-
- assertTrue("Should have had an exception since ssp grouper factories didn't match", gotException)
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
deleted file mode 100644
index 7d4bea8..0000000
--- a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.checkpoint.kafka
-
-import kafka.admin.AdminUtils
-import kafka.common.InvalidMessageSizeException
-import kafka.common.UnknownTopicOrPartitionException
-import kafka.message.InvalidMessageException
-
-import kafka.server.KafkaConfig
-import kafka.server.KafkaServer
-import kafka.utils.TestUtils
-import kafka.utils.TestZKUtils
-import kafka.utils.Utils
-import kafka.utils.ZKStringSerializer
-import kafka.zk.EmbeddedZookeeper
-
-import org.I0Itec.zkclient.ZkClient
-import org.apache.samza.checkpoint.Checkpoint
-import org.apache.samza.config.{KafkaProducerConfig, MapConfig}
-import org.apache.samza.container.TaskName
-import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
-import org.apache.samza.serializers.CheckpointSerde
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.util.{ ClientUtilTopicMetadataStore, TopicMetadataStore }
-import org.apache.samza.{ SamzaException, Partition }
-import org.junit.Assert._
-import org.junit.{ AfterClass, BeforeClass, Test }
-
-import scala.collection._
-import scala.collection.JavaConversions._
-import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer}
-
-object TestKafkaCheckpointManager {
- val checkpointTopic = "checkpoint-topic"
- val serdeCheckpointTopic = "checkpoint-topic-invalid-serde"
- val zkConnect: String = TestZKUtils.zookeeperConnect
- var zkClient: ZkClient = null
- val zkConnectionTimeout = 6000
- val zkSessionTimeout = 6000
-
- val brokerId1 = 0
- val brokerId2 = 1
- val brokerId3 = 2
- val ports = TestUtils.choosePorts(3)
- val (port1, port2, port3) = (ports(0), ports(1), ports(2))
-
- val props1 = TestUtils.createBrokerConfig(brokerId1, port1)
- props1.put("controlled.shutdown.enable", "true")
- val props2 = TestUtils.createBrokerConfig(brokerId2, port2)
- props1.put("controlled.shutdown.enable", "true")
- val props3 = TestUtils.createBrokerConfig(brokerId3, port3)
- props1.put("controlled.shutdown.enable", "true")
-
- val config = new java.util.HashMap[String, Object]()
- val brokers = "localhost:%d,localhost:%d,localhost:%d" format (port1, port2, port3)
- config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
- config.put("acks", "all")
- config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, new Integer(1))
- config.put(ProducerConfig.RETRIES_CONFIG, new Integer(java.lang.Integer.MAX_VALUE-1))
- config.putAll(KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES)
- val producerConfig = new KafkaProducerConfig("kafka", "i001", config)
- val partition = new Partition(0)
- val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "123"))
- val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", partition) -> "12345"))
- var zookeeper: EmbeddedZookeeper = null
- var server1: KafkaServer = null
- var server2: KafkaServer = null
- var server3: KafkaServer = null
- var metadataStore: TopicMetadataStore = null
-
- val systemStreamPartitionGrouperFactoryString = classOf[GroupByPartitionFactory].getCanonicalName
-
- @BeforeClass
- def beforeSetupServers {
- zookeeper = new EmbeddedZookeeper(zkConnect)
- server1 = TestUtils.createServer(new KafkaConfig(props1))
- server2 = TestUtils.createServer(new KafkaConfig(props2))
- server3 = TestUtils.createServer(new KafkaConfig(props3))
- metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name")
- }
-
- @AfterClass
- def afterCleanLogDirs {
- server1.shutdown
- server1.awaitShutdown()
- server2.shutdown
- server2.awaitShutdown()
- server3.shutdown
- server3.awaitShutdown()
- Utils.rm(server1.config.logDirs)
- Utils.rm(server2.config.logDirs)
- Utils.rm(server3.config.logDirs)
- zookeeper.shutdown
- }
-}
-
-class TestKafkaCheckpointManager {
- import TestKafkaCheckpointManager._
-
- @Test
- def testCheckpointShouldBeNullIfCheckpointTopicDoesNotExistShouldBeCreatedOnWriteAndShouldBeReadableAfterWrite {
- val kcm = getKafkaCheckpointManager
- val taskName = new TaskName(partition.toString)
- kcm.register(taskName)
- kcm.start
- // check that log compaction is enabled.
- val zkClient = new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)
- val topicConfig = AdminUtils.fetchTopicConfig(zkClient, checkpointTopic)
- zkClient.close
- assertEquals("compact", topicConfig.get("cleanup.policy"))
- assertEquals("26214400", topicConfig.get("segment.bytes"))
- // read before topic exists should result in a null checkpoint
- var readCp = kcm.readLastCheckpoint(taskName)
- assertNull(readCp)
- // create topic the first time around
- kcm.writeCheckpoint(taskName, cp1)
- readCp = kcm.readLastCheckpoint(taskName)
- assertEquals(cp1, readCp)
- // should get an exception if partition doesn't exist
- try {
- readCp = kcm.readLastCheckpoint(new TaskName(new Partition(1).toString))
- fail("Expected a SamzaException, since only one partition (partition 0) should exist.")
- } catch {
- case e: SamzaException => None // expected
- case _: Exception => fail("Expected a SamzaException, since only one partition (partition 0) should exist.")
- }
- // writing a second message should work, too
- kcm.writeCheckpoint(taskName, cp2)
- readCp = kcm.readLastCheckpoint(taskName)
- assertEquals(cp2, readCp)
- kcm.stop
- }
-
- @Test
- def testUnrecoverableKafkaErrorShouldThrowKafkaCheckpointManagerException {
- val exceptions = List("InvalidMessageException", "InvalidMessageSizeException", "UnknownTopicOrPartitionException")
- exceptions.foreach { exceptionName =>
- val kcm = getKafkaCheckpointManagerWithInvalidSerde(exceptionName)
- val taskName = new TaskName(partition.toString)
- kcm.register(taskName)
- kcm.start
- kcm.writeCheckpoint(taskName, cp1)
- // because serde will throw unrecoverable errors, it should result a KafkaCheckpointException
- try {
- kcm.readLastCheckpoint(taskName)
- fail("Expected a KafkaCheckpointException.")
- } catch {
- case e: KafkaCheckpointException => None
- }
- kcm.stop
- }
- }
-
- private def getKafkaCheckpointManager = new KafkaCheckpointManager(
- clientId = "some-client-id",
- checkpointTopic = checkpointTopic,
- systemName = "kafka",
- replicationFactor = 3,
- socketTimeout = 30000,
- bufferSize = 64 * 1024,
- fetchSize = 300 * 1024,
- metadataStore = metadataStore,
- connectProducer = () => new KafkaProducer(producerConfig.getProducerProperties),
- connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer),
- systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString,
- checkpointTopicProperties = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new MapConfig(Map[String, String]())))
-
- // inject serde. Kafka exceptions will be thrown when serde.fromBytes is called
- private def getKafkaCheckpointManagerWithInvalidSerde(exception: String) = new KafkaCheckpointManager(
- clientId = "some-client-id-invalid-serde",
- checkpointTopic = serdeCheckpointTopic,
- systemName = "kafka",
- replicationFactor = 3,
- socketTimeout = 30000,
- bufferSize = 64 * 1024,
- fetchSize = 300 * 1024,
- metadataStore = metadataStore,
- connectProducer = () => new KafkaProducer(producerConfig.getProducerProperties),
- connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer),
- systemStreamPartitionGrouperFactoryString = systemStreamPartitionGrouperFactoryString,
- serde = new InvalideSerde(exception),
- checkpointTopicProperties = KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new MapConfig(Map[String, String]())))
-
- class InvalideSerde(exception: String) extends CheckpointSerde {
- override def fromBytes(bytes: Array[Byte]): Checkpoint = {
- exception match {
- case "InvalidMessageException" => throw new InvalidMessageException
- case "InvalidMessageSizeException" => throw new InvalidMessageSizeException
- case "UnknownTopicOrPartitionException" => throw new UnknownTopicOrPartitionException
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
index 0380d35..d260f2d 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -100,15 +100,15 @@ object TestKafkaSystemAdmin {
REPLICATION_FACTOR)
}
- def validateTopic(expectedPartitionCount: Int) {
+ def validateTopic(topic: String, expectedPartitionCount: Int) {
var done = false
var retries = 0
val maxRetries = 100
while (!done && retries < maxRetries) {
try {
- val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(TOPIC), "kafka", metadataStore.getTopicInfo)
- val topicMetadata = topicMetadataMap(TOPIC)
+ val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topic), "kafka", metadataStore.getTopicInfo)
+ val topicMetadata = topicMetadataMap(topic)
val errorCode = topicMetadata.errorCode
KafkaUtil.maybeThrowException(errorCode)
@@ -207,10 +207,9 @@ class TestKafkaSystemAdmin {
@Test
def testShouldGetOldestNewestAndNextOffsets {
-
// Create an empty topic with 50 partitions, but with no offsets.
createTopic
- validateTopic(50)
+ validateTopic(TOPIC, 50)
// Verify the empty topic behaves as expected.
var metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC))
@@ -271,7 +270,6 @@ class TestKafkaSystemAdmin {
@Test
def testNonExistentTopic {
-
val initialOffsets = systemAdmin.getSystemStreamMetadata(Set("non-existent-topic"))
val metadata = initialOffsets.getOrElse("non-existent-topic", fail("missing metadata"))
assertEquals(metadata, new SystemStreamMetadata("non-existent-topic", Map(
@@ -289,7 +287,21 @@ class TestKafkaSystemAdmin {
assertEquals("3", offsetsAfter(ssp2))
}
- class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin("test", brokers, connectZk = () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)) {
+ @Test
+ def testShouldCreateCoordinatorStream {
+ val topic = "test-coordinator-stream"
+ val systemAdmin = new KafkaSystemAdmin("test", brokers, () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer), coordinatorStreamReplicationFactor = 3)
+ systemAdmin.createCoordinatorStream(topic)
+ validateTopic(topic, 1)
+ val topicMetadataMap = TopicMetadataCache.getTopicMetadata(Set(topic), "kafka", metadataStore.getTopicInfo)
+ assertTrue(topicMetadataMap.contains(topic))
+ val topicMetadata = topicMetadataMap(topic)
+ val partitionMetadata = topicMetadata.partitionsMetadata.head
+ assertEquals(0, partitionMetadata.partitionId)
+ assertEquals(3, partitionMetadata.replicas.size)
+ }
+
+ class KafkaSystemAdminWithTopicMetadataError extends KafkaSystemAdmin("test", brokers, () => new ZkClient(zkConnect, 6000, 6000, ZKStringSerializer)) {
import kafka.api.{ TopicMetadata, TopicMetadataResponse }
// Simulate Kafka telling us that the leader for the topic is not available
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
index d3f25c0..d3616fe 100644
--- a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
+++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
@@ -28,9 +28,13 @@ import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
import org.apache.samza.config.Log4jSystemConfig;
import org.apache.samza.config.SerializerConfig;
import org.apache.samza.config.ShellCommandConfig;
+import org.apache.samza.config.StreamConfig;
+import org.apache.samza.config.SystemConfig;
+import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.logging.log4j.serializers.LoggingEventJsonSerdeFactory;
import org.apache.samza.metrics.MetricsRegistryMap;
@@ -177,7 +181,8 @@ public class StreamAppender extends AppenderSkeleton {
try {
if (isApplicationMaster) {
- config = SamzaObjectMapper.getObjectMapper().readValue(System.getenv(ShellCommandConfig.ENV_CONFIG()), Config.class);
+ Config coordinatorSystemConfig = new MapConfig(SamzaObjectMapper.getObjectMapper().readValue(System.getenv(ShellCommandConfig.ENV_COORDINATOR_SYSTEM_CONFIG()), Config.class));
+ config = JobCoordinator.apply(coordinatorSystemConfig).jobModel().getConfig();
} else {
String url = System.getenv(ShellCommandConfig.ENV_COORDINATOR_URL());
config = SamzaObjectMapper.getObjectMapper().readValue(Util.read(new URL(url), 30000), JobModel.class).getConfig();
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-test/src/main/config/join/common.properties
----------------------------------------------------------------------
diff --git a/samza-test/src/main/config/join/common.properties b/samza-test/src/main/config/join/common.properties
index ad10aac..ac87e81 100644
--- a/samza-test/src/main/config/join/common.properties
+++ b/samza-test/src/main/config/join/common.properties
@@ -23,7 +23,7 @@ yarn.package.path=<YARN.PACKAGE.PATH>
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
-task.checkpoint.system=kafka-checkpoints
+task.checkpoint.system=kafka
task.checkpoint.replication.factor=1
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
@@ -36,11 +36,5 @@ systems.kafka.producer.bootstrap.servers=localhost:9092
systems.kafka.samza.key.serde=string
systems.kafka.samza.msg.serde=string
- # Checkpoints System
-systems.kafka-checkpoints.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
-systems.kafka-checkpoints.producer.bootstrap.servers=localhost:9092
-systems.kafka-checkpoints.consumer.zookeeper.connect=localhost:2181
-
yarn.container.retry.count=-1
yarn.container.retry.window.ms=60000
-
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-test/src/main/config/negate-number.properties
----------------------------------------------------------------------
diff --git a/samza-test/src/main/config/negate-number.properties b/samza-test/src/main/config/negate-number.properties
index b9f898c..26fa1e3 100644
--- a/samza-test/src/main/config/negate-number.properties
+++ b/samza-test/src/main/config/negate-number.properties
@@ -38,3 +38,6 @@ systems.kafka.samza.key.serde=string
systems.kafka.samza.offset.default=oldest
systems.kafka.consumer.zookeeper.connect=localhost:2181/
systems.kafka.producer.bootstrap.servers=localhost:9092
+
+#Coordinator replication factor
+job.coordinator.replication.factor=1
[4/4] samza git commit: SAMZA-465: Use coordinator stream and
eliminate CheckpointManager
Posted by ni...@apache.org.
SAMZA-465: Use coordinator stream and eliminate CheckpointManager
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/23fb2e1c
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/23fb2e1c
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/23fb2e1c
Branch: refs/heads/master
Commit: 23fb2e1c097317845439ec0abb938821a6106969
Parents: c37d752
Author: Naveen Somasundaram <na...@gmail.com>
Authored: Tue Apr 28 19:57:06 2015 -0700
Committer: Yi Pan (Data Infrastructure) <yi...@yipan-ld1.linkedin.biz>
Committed: Tue Apr 28 19:57:06 2015 -0700
----------------------------------------------------------------------
build.gradle | 6 +-
.../org/apache/samza/checkpoint/Checkpoint.java | 72 ---
.../samza/checkpoint/CheckpointManager.java | 70 ---
.../checkpoint/CheckpointManagerFactory.java | 30 --
.../org/apache/samza/container/TaskName.java | 4 +-
.../org/apache/samza/system/SystemAdmin.java | 29 +-
.../org/apache/samza/task/TaskCoordinator.java | 2 +-
...inglePartitionWithoutOffsetsSystemAdmin.java | 5 +
.../org/apache/samza/checkpoint/Checkpoint.java | 72 +++
.../samza/checkpoint/CheckpointManager.java | 116 +++++
.../stream/CoordinatorStreamMessage.java | 476 +++++++++++++++++++
.../stream/CoordinatorStreamSystemConsumer.java | 197 ++++++++
.../stream/CoordinatorStreamSystemProducer.java | 134 ++++++
.../org/apache/samza/job/model/TaskModel.java | 72 +--
.../serializers/model/JsonTaskModelMixIn.java | 8 +-
.../serializers/model/SamzaObjectMapper.java | 24 +
.../storage/ChangelogPartitionManager.java | 115 +++++
.../samza/checkpoint/CheckpointTool.scala | 34 +-
.../apache/samza/checkpoint/OffsetManager.scala | 58 +--
.../file/FileSystemCheckpointManager.scala | 107 -----
.../org/apache/samza/config/JobConfig.scala | 63 ++-
.../samza/config/ShellCommandConfig.scala | 4 +-
.../org/apache/samza/config/StreamConfig.scala | 2 -
.../org/apache/samza/config/TaskConfig.scala | 2 +-
.../apache/samza/container/SamzaContainer.scala | 18 +-
.../samza/coordinator/JobCoordinator.scala | 270 +++++++----
.../samza/coordinator/server/HttpServer.scala | 3 +-
.../samza/coordinator/server/JobServlet.scala | 7 +-
.../stream/CoordinatorStreamSystemFactory.scala | 50 ++
.../scala/org/apache/samza/job/JobRunner.scala | 77 +--
.../samza/job/local/ProcessJobFactory.scala | 9 +-
.../samza/job/local/ThreadJobFactory.scala | 5 +-
.../apache/samza/serializers/JsonSerde.scala | 30 +-
.../filereader/FileReaderSystemAdmin.scala | 8 +-
.../main/scala/org/apache/samza/util/Util.scala | 131 ++++-
.../MockCoordinatorStreamSystemFactory.java | 118 +++++
.../MockCoordinatorStreamWrappedConsumer.java | 128 +++++
.../stream/TestCoordinatorStreamMessage.java | 66 +++
.../TestCoordinatorStreamSystemConsumer.java | 133 ++++++
.../TestCoordinatorStreamSystemProducer.java | 153 ++++++
.../model/TestSamzaObjectMapper.java | 6 +-
samza-core/src/test/resources/test.properties | 1 +
.../samza/checkpoint/TestCheckpointTool.scala | 20 +-
.../samza/checkpoint/TestOffsetManager.scala | 50 +-
.../file/TestFileSystemCheckpointManager.scala | 86 ----
.../samza/container/TestSamzaContainer.scala | 21 +-
.../task/TestGroupByContainerCount.scala | 2 +-
.../samza/coordinator/TestJobCoordinator.scala | 238 ++++++++--
.../coordinator/server/TestHttpServer.scala | 2 +-
.../apache/samza/job/local/TestProcessJob.scala | 2 +-
.../samza/serializers/TestJsonSerde.scala | 4 +-
.../kafka/KafkaCheckpointLogKey.scala | 186 --------
.../kafka/KafkaCheckpointManager.scala | 427 -----------------
.../kafka/KafkaCheckpointManagerFactory.scala | 116 -----
.../org/apache/samza/config/KafkaConfig.scala | 10 -
.../samza/config/RegExTopicGenerator.scala | 2 +-
.../samza/system/kafka/KafkaSystemAdmin.scala | 59 ++-
.../samza/system/kafka/KafkaSystemFactory.scala | 23 +-
.../kafka/TestKafkaCheckpointLogKey.scala | 71 ---
.../kafka/TestKafkaCheckpointManager.scala | 211 --------
.../system/kafka/TestKafkaSystemAdmin.scala | 26 +-
.../samza/logging/log4j/StreamAppender.java | 7 +-
.../src/main/config/join/common.properties | 8 +-
.../src/main/config/negate-number.properties | 3 +
.../perf/container-performance.properties | 12 +-
.../kafka-read-write-performance.properties | 3 +
.../samza/system/mock/MockSystemAdmin.java | 18 +-
.../src/main/python/samza_failure_testing.py | 2 +
.../test/performance/TestPerformanceTask.scala | 4 +-
.../test/integration/TestStatefulTask.scala | 17 +-
.../org/apache/samza/config/YarnConfig.scala | 3 -
.../apache/samza/job/yarn/SamzaAppMaster.scala | 12 +-
.../samza/job/yarn/SamzaAppMasterState.scala | 3 +-
.../job/yarn/SamzaAppMasterTaskManager.scala | 12 +-
.../org/apache/samza/job/yarn/YarnJob.scala | 9 +-
.../job/yarn/TestSamzaAppMasterLifecycle.scala | 8 +-
.../job/yarn/TestSamzaAppMasterService.scala | 15 +-
.../yarn/TestSamzaAppMasterTaskManager.scala | 42 +-
78 files changed, 2815 insertions(+), 1834 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 97de3a2..ebad6eb 100644
--- a/build.gradle
+++ b/build.gradle
@@ -123,7 +123,9 @@ project(':samza-api') {
project(":samza-core_$scalaVersion") {
apply plugin: 'scala'
-
+ // Force scala joint compilation
+ sourceSets.main.scala.srcDir "src/main/java"
+ sourceSets.main.java.srcDirs = []
jar {
manifest {
attributes("Implementation-Version": "$version")
@@ -239,6 +241,7 @@ project(":samza-yarn_$scalaVersion") {
compile "org.apache.zookeeper:zookeeper:$zookeeperVersion"
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-all:$mockitoVersion"
+ testCompile project(":samza-core_$scalaVersion").sourceSets.test.output
}
repositories {
@@ -384,6 +387,7 @@ project(":samza-test_$scalaVersion") {
testCompile "org.apache.kafka:kafka_$scalaVersion:$kafkaVersion:test"
testCompile "com.101tec:zkclient:$zkClientVersion"
testCompile project(":samza-kafka_$scalaVersion")
+ testCompile project(":samza-core_$scalaVersion").sourceSets.test.output
testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion"
testRuntime "org.slf4j:slf4j-simple:$slf4jVersion"
}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java b/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
deleted file mode 100644
index 593d118..0000000
--- a/samza-api/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.checkpoint;
-
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * A checkpoint is a mapping of all the streams a job is consuming and the most recent current offset for each.
- * It is used to restore a {@link org.apache.samza.task.StreamTask}, either as part of a job restart or as part
- * of restarting a failed container within a running job.
- */
-public class Checkpoint {
- private final Map<SystemStreamPartition, String> offsets;
-
- /**
- * Constructs a new checkpoint based off a map of Samza stream offsets.
- * @param offsets Map of Samza streams to their current offset.
- */
- public Checkpoint(Map<SystemStreamPartition, String> offsets) {
- this.offsets = offsets;
- }
-
- /**
- * Gets a unmodifiable view of the current Samza stream offsets.
- * @return A unmodifiable view of a Map of Samza streams to their recorded offsets.
- */
- public Map<SystemStreamPartition, String> getOffsets() {
- return Collections.unmodifiableMap(offsets);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (!(o instanceof Checkpoint)) return false;
-
- Checkpoint that = (Checkpoint) o;
-
- if (offsets != null ? !offsets.equals(that.offsets) : that.offsets != null) return false;
-
- return true;
- }
-
- @Override
- public int hashCode() {
- return offsets != null ? offsets.hashCode() : 0;
- }
-
- @Override
- public String toString() {
- return "Checkpoint [offsets=" + offsets + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
deleted file mode 100644
index 092cb91..0000000
--- a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.checkpoint;
-
-import org.apache.samza.SamzaException;
-import org.apache.samza.container.TaskName;
-
-import java.util.Map;
-
-/**
- * CheckpointManagers read and write {@link org.apache.samza.checkpoint.Checkpoint} to some
- * implementation-specific location.
- */
-public interface CheckpointManager {
- public void start();
-
- /**
- * Registers this manager to write checkpoints of a specific Samza stream partition.
- * @param taskName Specific Samza taskName of which to write checkpoints for.
- */
- public void register(TaskName taskName);
-
- /**
- * Writes a checkpoint based on the current state of a Samza stream partition.
- * @param taskName Specific Samza taskName of which to write a checkpoint of.
- * @param checkpoint Reference to a Checkpoint object to store offset data in.
- */
- public void writeCheckpoint(TaskName taskName, Checkpoint checkpoint);
-
- /**
- * Returns the last recorded checkpoint for a specified taskName.
- * @param taskName Specific Samza taskName for which to get the last checkpoint of.
- * @return A Checkpoint object with the recorded offset data of the specified partition.
- */
- public Checkpoint readLastCheckpoint(TaskName taskName);
-
- /**
- * Read the taskName to partition mapping that is being maintained by this CheckpointManager
- *
- * @return TaskName to task log partition mapping, or an empty map if there were no messages.
- */
- public Map<TaskName, Integer> readChangeLogPartitionMapping();
-
- /**
- * Write the taskName to partition mapping that is being maintained by this CheckpointManager
- *
- * @param mapping Each TaskName's partition within the changelog
- */
- public void writeChangeLogPartitionMapping(Map<TaskName, Integer> mapping);
-
- public void stop();
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java
deleted file mode 100644
index a97ff09..0000000
--- a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManagerFactory.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.checkpoint;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.metrics.MetricsRegistry;
-
-/**
- * Build a {@link org.apache.samza.checkpoint.CheckpointManager}.
- */
-public interface CheckpointManagerFactory {
- public CheckpointManager getCheckpointManager(Config config, MetricsRegistry registry);
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-api/src/main/java/org/apache/samza/container/TaskName.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/container/TaskName.java b/samza-api/src/main/java/org/apache/samza/container/TaskName.java
index 0833586..aa19293 100644
--- a/samza-api/src/main/java/org/apache/samza/container/TaskName.java
+++ b/samza-api/src/main/java/org/apache/samza/container/TaskName.java
@@ -21,7 +21,9 @@ package org.apache.samza.container;
/**
* A unique identifier of a set of a SystemStreamPartitions that have been grouped by
* a {@link org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper}. The
- * SystemStreamPartitionGrouper determines the TaskName for each set it creates.
+ * SystemStreamPartitionGrouper determines the TaskName for each set it creates. The TaskName class
+ * should only contain the taskName. This is necessary because the ChangelogManager assumes that the taskName is
+ * unique enough to identify this class, and uses it to store it in the underlying coordinator stream (as a key).
*/
public class TaskName implements Comparable<TaskName> {
private final String taskName;
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
index 8995ba3..7a588eb 100644
--- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java
@@ -23,10 +23,9 @@ import java.util.Map;
import java.util.Set;
/**
- * Helper interface attached to an underlying system to fetch
- * information about streams, partitions, offsets, etc. This interface is useful
- * for providing utility methods that Samza needs in order to interact with a
- * system.
+ * Helper interface attached to an underlying system to fetch information about
+ * streams, partitions, offsets, etc. This interface is useful for providing
+ * utility methods that Samza needs in order to interact with a system.
*/
public interface SystemAdmin {
@@ -51,10 +50,22 @@ public interface SystemAdmin {
*/
Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> streamNames);
- /**
- * An API to create a change log stream
- * @param streamName The name of the stream to be created in the underlying stream
- * @param numOfPartitions The number of partitions in the changelog stream
- */
+ /**
+ * An API to create a change log stream
+ *
+ * @param streamName
+ * The name of the stream to be created in the underlying stream
+ * @param numOfPartitions
+ * The number of partitions in the changelog stream
+ */
void createChangelogStream(String streamName, int numOfPartitions);
+
+ /**
+ * Create a stream for the job coordinator. If the stream already exists, this
+ * call should simply return.
+ *
+ * @param streamName
+ * The name of the coordinator stream to create.
+ */
+ void createCoordinatorStream(String streamName);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java b/samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java
index 6ff1a55..e7bf6f1 100644
--- a/samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java
+++ b/samza-api/src/main/java/org/apache/samza/task/TaskCoordinator.java
@@ -22,7 +22,7 @@ package org.apache.samza.task;
/**
* TaskCoordinators are provided to the process methods of {@link org.apache.samza.task.StreamTask} implementations
* to allow the user code to request actions from the Samza framework, including committing the current checkpoints
- * to configured {@link org.apache.samza.checkpoint.CheckpointManager}s or shutting down the task or all tasks within
+ * to configured org.apache.samza.checkpoint.CheckpointManager or shutting down the task or all tasks within
* a container.
* <p>
* This interface may evolve over time.
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
index 01997ae..a6b14fb 100644
--- a/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
+++ b/samza-api/src/main/java/org/apache/samza/util/SinglePartitionWithoutOffsetsSystemAdmin.java
@@ -69,4 +69,9 @@ public class SinglePartitionWithoutOffsetsSystemAdmin implements SystemAdmin {
return offsetsAfter;
}
+
+ @Override
+ public void createCoordinatorStream(String streamName) {
+ throw new UnsupportedOperationException("Single partition admin can't create coordinator streams.");
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/checkpoint/Checkpoint.java b/samza-core/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
new file mode 100644
index 0000000..593d118
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/checkpoint/Checkpoint.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * A checkpoint is a mapping of all the streams a job is consuming and the most recent current offset for each.
+ * It is used to restore a {@link org.apache.samza.task.StreamTask}, either as part of a job restart or as part
+ * of restarting a failed container within a running job.
+ */
+public class Checkpoint {
+ private final Map<SystemStreamPartition, String> offsets;
+
+ /**
+ * Constructs a new checkpoint based off a map of Samza stream offsets.
+ * @param offsets Map of Samza streams to their current offset.
+ */
+ public Checkpoint(Map<SystemStreamPartition, String> offsets) {
+ this.offsets = offsets;
+ }
+
+ /**
+ * Gets a unmodifiable view of the current Samza stream offsets.
+ * @return A unmodifiable view of a Map of Samza streams to their recorded offsets.
+ */
+ public Map<SystemStreamPartition, String> getOffsets() {
+ return Collections.unmodifiableMap(offsets);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof Checkpoint)) return false;
+
+ Checkpoint that = (Checkpoint) o;
+
+ if (offsets != null ? !offsets.equals(that.offsets) : that.offsets != null) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return offsets != null ? offsets.hashCode() : 0;
+ }
+
+ @Override
+ public String toString() {
+ return "Checkpoint [offsets=" + offsets + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java b/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
new file mode 100644
index 0000000..3ac63ca
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.checkpoint;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetCheckpoint;
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The CheckpointManager is used to persist and restore checkpoint information. The CheckpointManager uses
+ * CoordinatorStream underneath to do this.
+ */
+public class CheckpointManager {
+
+ private static final Logger log = LoggerFactory.getLogger(CheckpointManager.class);
+ private final CoordinatorStreamSystemProducer coordinatorStreamProducer;
+ private final CoordinatorStreamSystemConsumer coordinatorStreamConsumer;
+ private final Map<TaskName, Checkpoint> taskNamesToOffsets;
+ private final HashSet<TaskName> taskNames;
+ private String source;
+
+ public CheckpointManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
+ CoordinatorStreamSystemConsumer coordinatorStreamConsumer) {
+ this.coordinatorStreamConsumer = coordinatorStreamConsumer;
+ this.coordinatorStreamProducer = coordinatorStreamProducer;
+ taskNamesToOffsets = new HashMap<TaskName, Checkpoint>();
+ taskNames = new HashSet<TaskName>();
+ this.source = "Unknown";
+ }
+
+ public CheckpointManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
+ CoordinatorStreamSystemConsumer coordinatorStreamConsumer,
+ String source) {
+ this(coordinatorStreamProducer, coordinatorStreamConsumer);
+ this.source = source;
+ }
+
+ public void start() {
+ coordinatorStreamProducer.start();
+ coordinatorStreamConsumer.start();
+ }
+
+ /**
+ * Registers this manager to write checkpoints of a specific Samza stream partition.
+ * @param taskName Specific Samza taskName of which to write checkpoints for.
+ */
+ public void register(TaskName taskName) {
+ log.debug("Adding taskName {} to {}", taskName, this);
+ taskNames.add(taskName);
+ coordinatorStreamConsumer.register();
+ coordinatorStreamProducer.register(taskName.getTaskName());
+ }
+
+ /**
+ * Writes a checkpoint based on the current state of a Samza stream partition.
+ * @param taskName Specific Samza taskName of which to write a checkpoint of.
+ * @param checkpoint Reference to a Checkpoint object to store offset data in.
+ */
+ public void writeCheckpoint(TaskName taskName, Checkpoint checkpoint) {
+ log.debug("Writing checkpoint for Task: {} with offsets: {}", taskName.getTaskName(), checkpoint.getOffsets());
+ SetCheckpoint checkPointMessage = new SetCheckpoint(source, taskName.getTaskName(), checkpoint);
+ coordinatorStreamProducer.send(checkPointMessage);
+ }
+
+ /**
+ * Returns the last recorded checkpoint for a specified taskName.
+ * @param taskName Specific Samza taskName for which to get the last checkpoint of.
+ * @return A Checkpoint object with the recorded offset data of the specified partition.
+ */
+ public Checkpoint readLastCheckpoint(TaskName taskName) {
+ // Bootstrap each time to make sure that we are caught up with the stream, the bootstrap will just catch up on consecutive calls
+ log.debug("Reading checkpoint for Task: {}", taskName.getTaskName());
+ Set<CoordinatorStreamMessage> bootstrappedStream = coordinatorStreamConsumer.getBootstrappedStream(SetCheckpoint.TYPE);
+ for (CoordinatorStreamMessage coordinatorStreamMessage : bootstrappedStream) {
+ SetCheckpoint setCheckpoint = new SetCheckpoint(coordinatorStreamMessage);
+ TaskName taskNameInCheckpoint = new TaskName(setCheckpoint.getKey());
+ if(taskNames.contains(taskNameInCheckpoint)) {
+ taskNamesToOffsets.put(taskNameInCheckpoint, setCheckpoint.getCheckpoint());
+ log.debug("Adding checkpoint {} for taskName {}", taskNameInCheckpoint, taskName);
+ }
+ }
+ return taskNamesToOffsets.get(taskName);
+ }
+
+ public void stop() {
+ coordinatorStreamConsumer.stop();
+ coordinatorStreamProducer.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
new file mode 100644
index 0000000..f8b705f
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * Represents a message for the job coordinator. All messages in the coordinator
+ * stream must wrap the CoordinatorStreamMessage class. Coordinator stream
+ * messages are modeled as key/value pairs. The key is a list of well defined
+ * fields: version, type, and key. The value is a map. There are some
+ * pre-defined fields (such as timestamp, host, etc) for the value map, which
+ * are common to all messages.
+ * </p>
+ *
+ * <p>
+ * The full structure for a CoordinatorStreamMessage is:
+ * </p>
+ *
+ * <pre>
+ * key => [1, "set-config", "job.name"]
+ *
+ * message => {
+ * "host": "192.168.0.1",
+ * "username": "criccomini",
+ * "source": "job-runner",
+ * "timestamp": 123456789,
+ * "values": {
+ * "value": "my-job-name"
+ * }
+ * }
+ * </pre>
+ *
+ * Where the key's structure is:
+ *
+ * <pre>
+ * key => [<version>, <type>, <key>]
+ * </pre>
+ *
+ * <p>
+ * Note that the white space in the above JSON blobs are done for legibility.
+ * Over the wire, the JSON should be compact, and no unnecessary white space
+ * should be used. This is extremely important for key serialization, since a
+ * key with [1,"set-config","job.name"] and [1, "set-config", "job.name"] will
+ * be evaluated as two different keys, and Kafka will not log compact them (if
+ * Kafka is used as the underlying system for a coordinator stream).
+ * </p>
+ *
+ * <p>
+ * The "values" map in the message is defined on a per-message-type basis. For
+ * set-config messages, there is just a single key/value pair, where the "value"
+ * key is defined. For offset messages, there will be multiple key/values pairs
+ * in "values" (one for each SystemStreamPartition/offset pair for a given
+ * TaskName).
+ * </p>
+ *
+ * <p>
+ * The most important fields are type, key, and values. The type field (defined
+ * as index 1 in the key list) defines the kind of message, the key (defined as
+ * index 2 in the key list) defines a key to associate with the values, and the
+ * values map defines a set of values associated with the type. A concrete
+ * example would be a config message of type "set-config" with key "job.name"
+ * and values {"value": "my-job-name"}.
+ * </p>
+ */
+public class CoordinatorStreamMessage {
+ public static int VERSION_INDEX = 0;
+ public static int TYPE_INDEX = 1;
+ public static int KEY_INDEX = 2;
+
+ private static final Logger log = LoggerFactory.getLogger(CoordinatorStreamMessage.class);
+
+ /**
+ * Protocol version for coordinator stream messages. This version number must
+ * be incremented any time new messages are added to the coordinator stream,
+ * or changes are made to the key/message headers.
+ */
+ public static final int VERSION = 1;
+
+ /**
+ * Contains all key fields. Currently, this includes the type of the message,
+ * the key associated with the type (e.g. type: set-config key: job.name), and
+ * the version of the protocol. The indices are defined as the INDEX static
+ * variables above.
+ */
+ private final Object[] keyArray;
+
+ /**
+ * Contains all fields for the message. This includes who sent the message,
+ * the host, etc. It also includes a "values" map, which contains all values
+ * associated with the key of the message. If set-config/job.name were used as
+ * the type/key of the message, then values would contain
+ * {"value":"my-job-name"}.
+ */
+ private final Map<String, Object> messageMap;
+ private boolean isDelete;
+
+ public CoordinatorStreamMessage(CoordinatorStreamMessage message) {
+ this(message.getKeyArray(), message.getMessageMap());
+ }
+
+ public CoordinatorStreamMessage(Object[] keyArray, Map<String, Object> messageMap) {
+ this.keyArray = keyArray;
+ this.messageMap = messageMap;
+ this.isDelete = messageMap == null;
+ }
+
+ public CoordinatorStreamMessage(String source) {
+ this(source, new Object[] { Integer.valueOf(VERSION), null, null }, new HashMap<String, Object>());
+ }
+
+ public CoordinatorStreamMessage(String source, Object[] keyArray, Map<String, Object> messageMap) {
+ this(keyArray, messageMap);
+ if (!isDelete) {
+ this.messageMap.put("values", new HashMap<String, String>());
+ setSource(source);
+ setUsername(System.getProperty("user.name"));
+ setTimestamp(System.currentTimeMillis());
+
+ try {
+ setHost(InetAddress.getLocalHost().getHostAddress());
+ } catch (UnknownHostException e) {
+ log.warn("Unable to retrieve host for current machine. Setting coordinator stream message host field to an empty string.");
+ setHost("");
+ }
+ }
+
+ setVersion(VERSION);
+ }
+
+ protected void setIsDelete(boolean isDelete) {
+ this.isDelete = isDelete;
+ }
+
+ protected void setHost(String host) {
+ messageMap.put("host", host);
+ }
+
+ protected void setUsername(String username) {
+ messageMap.put("username", username);
+ }
+
+ protected void setSource(String source) {
+ messageMap.put("source", source);
+ }
+
+ protected void setTimestamp(long timestamp) {
+ messageMap.put("timestamp", Long.valueOf(timestamp));
+ }
+
+ protected void setVersion(int version) {
+ this.keyArray[VERSION_INDEX] = Integer.valueOf(version);
+ }
+
+ protected void setType(String type) {
+ this.keyArray[TYPE_INDEX] = type;
+ }
+
+ protected void setKey(String key) {
+ this.keyArray[KEY_INDEX] = key;
+ }
+
+ @SuppressWarnings("unchecked")
+ protected Map<String, String> getMessageValues() {
+ return (Map<String, String>) this.messageMap.get("values");
+ }
+
+ protected String getMessageValue(String key) {
+ return getMessageValues().get(key);
+ }
+
+ /**
+ * @param key
+ * The key inside the messageMap, please only use human readable string (no JSON or such) - this allows
+ * easy mutation of the coordinator stream outside of Samza (scripts)
+ * @param value
+ * The value corresponding to the key, should also be a simple string
+ */
+ protected void putMessageValue(String key, String value) {
+ getMessageValues().put(key, value);
+ }
+
+ /**
+ * The type of the message is used to convert a generic
+ * CoordinatorStreaMessage into a specific message, such as a SetConfig
+ * message.
+ *
+ * @return The type of the message.
+ */
+ public String getType() {
+ return (String) this.keyArray[TYPE_INDEX];
+ }
+
+ /**
+ * @return The whole key map including both the key and type of the message.
+ */
+ public Object[] getKeyArray() {
+ return this.keyArray;
+ }
+
+ /**
+ * @return Whether the message signifies a delete or not.
+ */
+ public boolean isDelete() {
+ return isDelete;
+ }
+
+ /**
+ * @return Whether the message signifies a delete or not.
+ */
+ public String getUsername() {
+ return (String) this.messageMap.get("username");
+ }
+
+ /**
+ * @return Whether the message signifies a delete or not.
+ */
+ public long getTimestamp() {
+ return (Long) this.messageMap.get("timestamp");
+ }
+
+ /**
+ * @return The whole message map including header information.
+ */
+ public Map<String, Object> getMessageMap() {
+ if (!isDelete) {
+ Map<String, Object> immutableMap = new HashMap<String, Object>(messageMap);
+ // To make sure the values is not immutable, we overwrite it with an immutable version of the the values map.
+ immutableMap.put("values", Collections.unmodifiableMap(getMessageValues()));
+ return Collections.unmodifiableMap(immutableMap);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * @return The source that sent the coordinator message. This is a string
+ * defined by the sender.
+ */
+ public String getSource() {
+ return (String) this.messageMap.get("source");
+ }
+
+ /**
+ * @return The protocol version that the message conforms to.
+ */
+ public int getVersion() {
+ return (Integer) this.keyArray[VERSION_INDEX];
+ }
+
+ /**
+ * @return The key for a message. The key's meaning is defined by the type of
+ * the message.
+ */
+ public String getKey() {
+ return (String) this.keyArray[KEY_INDEX];
+ }
+
+ @Override
+ public String toString() {
+ return "CoordinatorStreamMessage [keyArray=" + Arrays.toString(keyArray) + ", messageMap=" + messageMap + ", isDelete=" + isDelete + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (isDelete ? 1231 : 1237);
+ result = prime * result + Arrays.hashCode(keyArray);
+ result = prime * result + ((messageMap == null) ? 0 : messageMap.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ CoordinatorStreamMessage other = (CoordinatorStreamMessage) obj;
+ if (isDelete != other.isDelete)
+ return false;
+ if (!Arrays.equals(keyArray, other.keyArray))
+ return false;
+ if (messageMap == null) {
+ if (other.messageMap != null)
+ return false;
+ } else if (!messageMap.equals(other.messageMap))
+ return false;
+ return true;
+ }
+
+ /**
+ * A coordinator stream message that tells the job coordinator to set a
+ * specific configuration.
+ */
+ public static class SetConfig extends CoordinatorStreamMessage {
+ public static final String TYPE = "set-config";
+
+ public SetConfig(CoordinatorStreamMessage message) {
+ super(message.getKeyArray(), message.getMessageMap());
+ }
+
+ public SetConfig(String source, String key, String value) {
+ super(source);
+ setType(TYPE);
+ setKey(key);
+ putMessageValue("value", value);
+ }
+
+ public String getConfigValue() {
+ return (String) getMessageValue("value");
+ }
+ }
+
+ public static class Delete extends CoordinatorStreamMessage {
+ public Delete(String source, String key, String type) {
+ this(source, key, type, VERSION);
+ }
+
+ /**
+ * <p>
+ * Delete messages must take the type of another CoordinatorStreamMessage
+ * (e.g. SetConfig) to define the type of message that's being deleted.
+ * Considering Kafka's log compaction, for example, the keys of a message
+ * and its delete key must match exactly:
+ * </p>
+ *
+ * <pre>
+ * k=>[1,"job.name","set-config"] .. v=> {..some stuff..}
+ * v=>[1,"job.name","set-config"] .. v=> null
+ * </pre>
+ *
+ * <p>
+ * Deletes are modeled as a CoordinatorStreamMessage with a null message
+ * map, and a key that's identical to the key map that's to be deleted.
+ * </p>
+ *
+ * @param source
+ * The source ID of the sender of the delete message.
+ * @param key
+ * The key to delete.
+ * @param type
+ * The type of message to delete. Must correspond to one of hte
+ * other CoordinatorStreamMessages.
+ * @param version
+ * The protocol version.
+ */
+ public Delete(String source, String key, String type, int version) {
+ super(source);
+ setType(type);
+ setKey(key);
+ setVersion(version);
+ setIsDelete(true);
+ }
+ }
+
+ /**
+ * The SetCheckpoint is used to store the checkpoint messages for a particular task.
+ * The structure looks like:
+ * {
+ * Key: TaskName
+ * Type: set-checkpoint
+ * Source: ContainerID
+ * MessageMap:
+ * {
+ * SSP1 : offset,
+ * SSP2 : offset
+ * }
+ * }
+ */
+ public static class SetCheckpoint extends CoordinatorStreamMessage {
+ public static final String TYPE = "set-checkpoint";
+
+ public SetCheckpoint(CoordinatorStreamMessage message) {
+ super(message.getKeyArray(), message.getMessageMap());
+ }
+
+ /**
+ *
+ * @param source The source writing the checkpoint
+ * @param key The key for the checkpoint message (Typically task name)
+ * @param checkpoint Checkpoint message to be written to the stream
+ */
+ public SetCheckpoint(String source, String key, Checkpoint checkpoint) {
+ super(source);
+ setType(TYPE);
+ setKey(key);
+ Map<SystemStreamPartition, String> offsets = checkpoint.getOffsets();
+ for (Map.Entry<SystemStreamPartition, String> systemStreamPartitionStringEntry : offsets.entrySet()) {
+ putMessageValue(Util.sspToString(systemStreamPartitionStringEntry.getKey()), systemStreamPartitionStringEntry.getValue());
+ }
+ }
+
+ public Checkpoint getCheckpoint() {
+ Map<SystemStreamPartition, String> offsetMap = new HashMap<SystemStreamPartition, String>();
+ for (Map.Entry<String, String> sspToOffsetEntry : getMessageValues().entrySet()) {
+ offsetMap.put(Util.stringToSsp(sspToOffsetEntry.getKey()), sspToOffsetEntry.getValue());
+ }
+ return new Checkpoint(offsetMap);
+ }
+ }
+
+ /**
+ * The SetChanglog is used to store the changelog parition information for a particular task.
+ * The structure looks like:
+ * {
+ * Key: TaskName
+ * Type: set-changelog
+ * Source: ContainerID
+ * MessageMap:
+ * {
+ * "Partition" : partitionNumber (They key is just a dummy key here, the value contains the actual partition)
+ * }
+ * }
+ */
+ public static class SetChangelogMapping extends CoordinatorStreamMessage {
+ public static final String TYPE = "set-changelog";
+
+ public SetChangelogMapping(CoordinatorStreamMessage message) {
+ super(message.getKeyArray(), message.getMessageMap());
+ }
+
+ /**
+ *
+ * @param source Source writing the change log mapping
+ * @param taskName The task name to be used in the mapping
+ * @param changelogPartitionNumber The partition to which the task's changelog is mapped to
+ */
+ public SetChangelogMapping(String source, String taskName, int changelogPartitionNumber) {
+ super(source);
+ setType(TYPE);
+ setKey(taskName);
+ putMessageValue("Partition", String.valueOf(changelogPartitionNumber));
+ }
+
+ public String getTaskName() {
+ return getKey();
+ }
+
+ public int getPartition() {
+ return Integer.parseInt(getMessageValue("Partition"));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
new file mode 100644
index 0000000..2134603
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemConsumer.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetConfig;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.SystemStreamPartitionIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A wrapper around a SystemConsumer that provides helpful methods for dealing
+ * with the coordinator stream.
+ */
+public class CoordinatorStreamSystemConsumer {
+ private static final Logger log = LoggerFactory.getLogger(CoordinatorStreamSystemConsumer.class);
+
+ private final Serde<List<?>> keySerde;
+ private final Serde<Map<String, Object>> messageSerde;
+ private final SystemStreamPartition coordinatorSystemStreamPartition;
+ private final SystemConsumer systemConsumer;
+ private final SystemAdmin systemAdmin;
+ private final Map<String, String> configMap;
+ private boolean isBootstrapped;
+ private boolean isStarted;
+ private Set<CoordinatorStreamMessage> bootstrappedStreamSet = new HashSet<CoordinatorStreamMessage>();
+
+ public CoordinatorStreamSystemConsumer(SystemStream coordinatorSystemStream, SystemConsumer systemConsumer, SystemAdmin systemAdmin, Serde<List<?>> keySerde, Serde<Map<String, Object>> messageSerde) {
+ this.coordinatorSystemStreamPartition = new SystemStreamPartition(coordinatorSystemStream, new Partition(0));
+ this.systemConsumer = systemConsumer;
+ this.systemAdmin = systemAdmin;
+ this.configMap = new HashMap<String, String>();
+ this.isBootstrapped = false;
+ this.keySerde = keySerde;
+ this.messageSerde = messageSerde;
+ }
+
+ public CoordinatorStreamSystemConsumer(SystemStream coordinatorSystemStream, SystemConsumer systemConsumer, SystemAdmin systemAdmin) {
+ this(coordinatorSystemStream, systemConsumer, systemAdmin, new JsonSerde<List<?>>(), new JsonSerde<Map<String, Object>>());
+ }
+
+ /**
+ * Retrieves the oldest offset in the coordinator stream, and registers the
+ * coordinator stream with the SystemConsumer using the earliest offset.
+ */
+ public void register() {
+ log.debug("Attempting to register: {}", coordinatorSystemStreamPartition);
+ Set<String> streamNames = new HashSet<String>();
+ String streamName = coordinatorSystemStreamPartition.getStream();
+ streamNames.add(streamName);
+ Map<String, SystemStreamMetadata> systemStreamMetadataMap = systemAdmin.getSystemStreamMetadata(streamNames);
+
+ if (systemStreamMetadataMap == null) {
+ throw new SamzaException("Received a null systemStreamMetadataMap from the systemAdmin. This is illegal.");
+ }
+
+ SystemStreamMetadata systemStreamMetadata = systemStreamMetadataMap.get(streamName);
+
+ if (systemStreamMetadata == null) {
+ throw new SamzaException("Expected " + streamName + " to be in system stream metadata.");
+ }
+
+ SystemStreamPartitionMetadata systemStreamPartitionMetadata = systemStreamMetadata.getSystemStreamPartitionMetadata().get(coordinatorSystemStreamPartition.getPartition());
+
+ if (systemStreamPartitionMetadata == null) {
+ throw new SamzaException("Expected metadata for " + coordinatorSystemStreamPartition + " to exist.");
+ }
+
+ String startingOffset = systemStreamPartitionMetadata.getOldestOffset();
+ log.debug("Registering {} with offset {}", coordinatorSystemStreamPartition, startingOffset);
+ systemConsumer.register(coordinatorSystemStreamPartition, startingOffset);
+ }
+
+ /**
+ * Starts the underlying SystemConsumer.
+ */
+ public void start() {
+ if(isStarted)
+ {
+ log.info("Coordinator stream consumer already started");
+ return;
+ }
+ log.info("Starting coordinator stream system consumer.");
+ systemConsumer.start();
+ isStarted = true;
+ }
+
+ /**
+ * Stops the underlying SystemConsumer.
+ */
+ public void stop() {
+ log.info("Stopping coordinator stream system consumer.");
+ systemConsumer.stop();
+ }
+
+ /**
+ * Read all messages from the earliest offset, all the way to the latest.
+ * Currently, this method only pays attention to config messages.
+ */
+ public void bootstrap() {
+ log.info("Bootstrapping configuration from coordinator stream.");
+ SystemStreamPartitionIterator iterator = new SystemStreamPartitionIterator(systemConsumer, coordinatorSystemStreamPartition);
+
+ try {
+ while (iterator.hasNext()) {
+ IncomingMessageEnvelope envelope = iterator.next();
+ Object[] keyArray = keySerde.fromBytes((byte[]) envelope.getKey()).toArray();
+ Map<String, Object> valueMap = null;
+ if (envelope.getMessage() != null) {
+ valueMap = messageSerde.fromBytes((byte[]) envelope.getMessage());
+ }
+ CoordinatorStreamMessage coordinatorStreamMessage = new CoordinatorStreamMessage(keyArray, valueMap);
+ log.debug("Received coordinator stream message: {}", coordinatorStreamMessage);
+ bootstrappedStreamSet.add(coordinatorStreamMessage);
+ if (SetConfig.TYPE.equals(coordinatorStreamMessage.getType())) {
+ String configKey = coordinatorStreamMessage.getKey();
+ if (coordinatorStreamMessage.isDelete()) {
+ configMap.remove(configKey);
+ } else {
+ String configValue = new SetConfig(coordinatorStreamMessage).getConfigValue();
+ configMap.put(configKey, configValue);
+ }
+ }
+ }
+ log.debug("Bootstrapped configuration: {}", configMap);
+ isBootstrapped = true;
+ } catch (Exception e) {
+ throw new SamzaException(e);
+ }
+ }
+
+ public Set<CoordinatorStreamMessage> getBoostrappedStream() {
+ log.info("Returning the bootstrapped data from the stream");
+ if(!isBootstrapped)
+ bootstrap();
+ return bootstrappedStreamSet;
+ }
+
+ public Set<CoordinatorStreamMessage> getBootstrappedStream(String type) {
+ log.debug("Bootstrapping coordinator stream for messages of type {}", type);
+ bootstrap();
+ HashSet<CoordinatorStreamMessage> bootstrappedStream = new HashSet<CoordinatorStreamMessage>();
+ for (CoordinatorStreamMessage coordinatorStreamMessage : bootstrappedStreamSet) {
+ if(type.equalsIgnoreCase(coordinatorStreamMessage.getType())) {
+ bootstrappedStream.add(coordinatorStreamMessage);
+ }
+ }
+ return bootstrappedStream;
+ }
+
+ /**
+ * @return The bootstrapped configuration that's been read after bootstrap has
+ * been invoked.
+ */
+ public Config getConfig() {
+ if (isBootstrapped) {
+ return new MapConfig(configMap);
+ } else {
+ throw new SamzaException("Must call bootstrap before retrieving config.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
new file mode 100644
index 0000000..0f3e10e
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamSystemProducer.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.serializers.JsonSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A wrapper around a SystemProducer that provides helpful methods for dealing
+ * with the coordinator stream.
+ */
+public class CoordinatorStreamSystemProducer {
+ private static final Logger log = LoggerFactory.getLogger(CoordinatorStreamSystemProducer.class);
+
+ private final Serde<List<?>> keySerde;
+ private final Serde<Map<String, Object>> messageSerde;
+ private final SystemStream systemStream;
+ private final SystemProducer systemProducer;
+ private final SystemAdmin systemAdmin;
+ private boolean isStarted;
+
+ public CoordinatorStreamSystemProducer(SystemStream systemStream, SystemProducer systemProducer, SystemAdmin systemAdmin) {
+ this(systemStream, systemProducer, systemAdmin, new JsonSerde<List<?>>(), new JsonSerde<Map<String, Object>>());
+ }
+
+ public CoordinatorStreamSystemProducer(SystemStream systemStream, SystemProducer systemProducer, SystemAdmin systemAdmin, Serde<List<?>> keySerde, Serde<Map<String, Object>> messageSerde) {
+ this.systemStream = systemStream;
+ this.systemProducer = systemProducer;
+ this.systemAdmin = systemAdmin;
+ this.keySerde = keySerde;
+ this.messageSerde = messageSerde;
+ }
+
+ /**
+ * Registers a source with the underlying SystemProducer.
+ *
+ * @param source
+ * The source to register.
+ */
+ public void register(String source) {
+ systemProducer.register(source);
+ }
+
+ /**
+ * Creates the coordinator stream, and starts the system producer.
+ */
+ public void start() {
+ if(isStarted)
+ {
+ log.info("Coordinator stream producer already started");
+ return;
+ }
+ log.info("Starting coordinator stream producer.");
+ systemProducer.start();
+ isStarted = true;
+ }
+
+ /**
+ * Stops the underlying SystemProducer.
+ */
+ public void stop() {
+ log.info("Stopping coordinator stream producer.");
+ systemProducer.stop();
+ }
+
+ /**
+ * Serialize and send a coordinator stream message.
+ *
+ * @param message
+ * The message to send.
+ */
+ public void send(CoordinatorStreamMessage message) {
+ log.debug("Sending {}", message);
+ try {
+ String source = message.getSource();
+ byte[] key = keySerde.toBytes(Arrays.asList(message.getKeyArray()));
+ byte[] value = null;
+ if (!message.isDelete()) {
+ value = messageSerde.toBytes(message.getMessageMap());
+ }
+ OutgoingMessageEnvelope envelope = new OutgoingMessageEnvelope(systemStream, Integer.valueOf(0), key, value);
+ systemProducer.send(source, envelope);
+ } catch (Exception e) {
+ throw new SamzaException(e);
+ }
+ }
+
+ /**
+ * Helper method that sends a series of SetConfig messages to the coordinator
+ * stream.
+ *
+ * @param source
+ * An identifier to denote which source is sending a message. This
+ * can be any arbitrary string.
+ * @param config
+ * The config object to store in the coordinator stream.
+ */
+ public void writeConfig(String source, Config config) {
+ log.debug("Writing config: {}", config);
+ for (Map.Entry<String, String> configPair : config.entrySet()) {
+ send(new CoordinatorStreamMessage.SetConfig(source, configPair.getKey(), configPair.getValue()));
+ }
+ systemProducer.flush(source);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java b/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
index eb22d2e..c26690a 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/TaskModel.java
@@ -20,11 +20,13 @@
package org.apache.samza.job.model;
import java.util.Collections;
+import java.util.Map;
import java.util.Set;
import org.apache.samza.Partition;
import org.apache.samza.container.TaskName;
import org.apache.samza.system.SystemStreamPartition;
+
/**
* <p>
* The data model used to represent a task. The model is used in the job
@@ -39,12 +41,12 @@ import org.apache.samza.system.SystemStreamPartition;
*/
public class TaskModel implements Comparable<TaskModel> {
private final TaskName taskName;
- private final Set<SystemStreamPartition> systemStreamPartitions;
+ private final Map<SystemStreamPartition, String> systemStreamPartitionsToOffsets;
private final Partition changelogPartition;
- public TaskModel(TaskName taskName, Set<SystemStreamPartition> systemStreamPartitions, Partition changelogPartition) {
+ public TaskModel(TaskName taskName, Map<SystemStreamPartition, String> systemStreamPartitionsToOffsets, Partition changelogPartition) {
this.taskName = taskName;
- this.systemStreamPartitions = Collections.unmodifiableSet(systemStreamPartitions);
+ this.systemStreamPartitionsToOffsets = Collections.unmodifiableMap(systemStreamPartitionsToOffsets);
this.changelogPartition = changelogPartition;
}
@@ -53,55 +55,55 @@ public class TaskModel implements Comparable<TaskModel> {
}
public Set<SystemStreamPartition> getSystemStreamPartitions() {
- return systemStreamPartitions;
+ return systemStreamPartitionsToOffsets.keySet();
}
public Partition getChangelogPartition() {
return changelogPartition;
}
- @Override
- public String toString() {
- return "TaskModel [taskName=" + taskName + ", systemStreamPartitions=" + systemStreamPartitions + ", changeLogPartition=" + changelogPartition + "]";
+ public Map<SystemStreamPartition, String> getCheckpointedOffsets() {
+ return systemStreamPartitionsToOffsets;
}
@Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((changelogPartition == null) ? 0 : changelogPartition.hashCode());
- result = prime * result + ((systemStreamPartitions == null) ? 0 : systemStreamPartitions.hashCode());
- result = prime * result + ((taskName == null) ? 0 : taskName.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
+ public boolean equals(Object o) {
+ if (this == o) {
return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
+ }
+ if (o == null || getClass() != o.getClass()) {
return false;
- TaskModel other = (TaskModel) obj;
- if (changelogPartition == null) {
- if (other.changelogPartition != null)
- return false;
- } else if (!changelogPartition.equals(other.changelogPartition))
+ }
+
+ TaskModel taskModel = (TaskModel) o;
+
+ if (!changelogPartition.equals(taskModel.changelogPartition)) {
return false;
- if (systemStreamPartitions == null) {
- if (other.systemStreamPartitions != null)
- return false;
- } else if (!systemStreamPartitions.equals(other.systemStreamPartitions))
+ }
+ if (!systemStreamPartitionsToOffsets.equals(taskModel.systemStreamPartitionsToOffsets)) {
return false;
- if (taskName == null) {
- if (other.taskName != null)
- return false;
- } else if (!taskName.equals(other.taskName))
+ }
+ if (!taskName.equals(taskModel.taskName)) {
return false;
+ }
+
return true;
}
+ @Override
+ public int hashCode() {
+ int result = taskName.hashCode();
+ result = 31 * result + systemStreamPartitionsToOffsets.hashCode();
+ result = 31 * result + changelogPartition.hashCode();
+ return result;
+ }
+
+ @Override
+
+ public String toString() {
+ return "TaskModel [taskName=" + taskName + ", systemStreamPartitions=" + systemStreamPartitionsToOffsets.keySet() + ", changeLogPartition=" + changelogPartition + "]";
+ }
+
public int compareTo(TaskModel other) {
return taskName.compareTo(other.getTaskName());
}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
index 7dc431c..172358a 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/JsonTaskModelMixIn.java
@@ -19,7 +19,7 @@
package org.apache.samza.serializers.model;
-import java.util.Set;
+import java.util.Map;
import org.apache.samza.Partition;
import org.apache.samza.container.TaskName;
import org.apache.samza.system.SystemStreamPartition;
@@ -31,14 +31,14 @@ import org.codehaus.jackson.annotate.JsonProperty;
*/
public abstract class JsonTaskModelMixIn {
@JsonCreator
- public JsonTaskModelMixIn(@JsonProperty("task-name") TaskName taskName, @JsonProperty("system-stream-partitions") Set<SystemStreamPartition> systemStreamPartitions, @JsonProperty("changelog-partition") Partition changelogPartition) {
+ public JsonTaskModelMixIn(@JsonProperty("task-name") TaskName taskName, @JsonProperty("system-stream-partitions-offsets") Map<SystemStreamPartition, String> systemStreamPartitionsToOffsets, @JsonProperty("changelog-partition") Partition changelogPartition) {
}
@JsonProperty("task-name")
abstract TaskName getTaskName();
- @JsonProperty("system-stream-partitions")
- abstract Set<SystemStreamPartition> getSystemStreamPartitions();
+ @JsonProperty("system-stream-partitions-offsets")
+ abstract Map<SystemStreamPartition, String> getCheckpointedOffsets();
@JsonProperty("changelog-partition")
abstract Partition getChangelogPartition();
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
index 3517912..17410c5 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
@@ -29,7 +29,9 @@ import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Util;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.JsonParser;
@@ -39,9 +41,11 @@ import org.codehaus.jackson.Version;
import org.codehaus.jackson.map.DeserializationContext;
import org.codehaus.jackson.map.JsonDeserializer;
import org.codehaus.jackson.map.JsonSerializer;
+import org.codehaus.jackson.map.KeyDeserializer;
import org.codehaus.jackson.map.MapperConfig;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.PropertyNamingStrategy;
+import org.codehaus.jackson.map.SerializationConfig;
import org.codehaus.jackson.map.SerializerProvider;
import org.codehaus.jackson.map.introspect.AnnotatedField;
import org.codehaus.jackson.map.introspect.AnnotatedMethod;
@@ -76,9 +80,11 @@ public class SamzaObjectMapper {
// Setup custom serdes for simple data types.
module.addSerializer(Partition.class, new PartitionSerializer());
module.addSerializer(SystemStreamPartition.class, new SystemStreamPartitionSerializer());
+ module.addKeySerializer(SystemStreamPartition.class, new SystemStreamPartitionKeySerializer());
module.addSerializer(TaskName.class, new TaskNameSerializer());
module.addDeserializer(Partition.class, new PartitionDeserializer());
module.addDeserializer(SystemStreamPartition.class, new SystemStreamPartitionDeserializer());
+ module.addKeyDeserializer(SystemStreamPartition.class, new SystemStreamPartitionKeyDeserializer());
module.addDeserializer(Config.class, new ConfigDeserializer());
// Setup mixins for data models.
@@ -88,6 +94,7 @@ public class SamzaObjectMapper {
mapper.getDeserializationConfig().addMixInAnnotations(ContainerModel.class, JsonContainerModelMixIn.class);
mapper.getSerializationConfig().addMixInAnnotations(JobModel.class, JsonJobModelMixIn.class);
mapper.getDeserializationConfig().addMixInAnnotations(JobModel.class, JsonJobModelMixIn.class);
+ mapper.configure(SerializationConfig.Feature.AUTO_DETECT_GETTERS, false);
// Convert camel case to hyphenated field names, and register the module.
mapper.setPropertyNamingStrategy(new CamelCaseToDashesStrategy());
@@ -138,6 +145,23 @@ public class SamzaObjectMapper {
}
}
+ public static class SystemStreamPartitionKeySerializer extends JsonSerializer<SystemStreamPartition> {
+ @Override
+ public void serialize(SystemStreamPartition systemStreamPartition, JsonGenerator jgen, SerializerProvider provider)
+ throws IOException {
+ String ssp = Util.sspToString(systemStreamPartition);
+ jgen.writeFieldName(ssp);
+ }
+ }
+
+ public static class SystemStreamPartitionKeyDeserializer extends KeyDeserializer {
+ @Override
+ public Object deserializeKey(String sspString, DeserializationContext ctxt)
+ throws IOException {
+ return Util.stringToSsp(sspString);
+ }
+ }
+
public static class SystemStreamPartitionSerializer extends JsonSerializer<SystemStreamPartition> {
@Override
public void serialize(SystemStreamPartition systemStreamPartition, JsonGenerator jsonGenerator, SerializerProvider provider) throws IOException, JsonProcessingException {
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java b/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
new file mode 100644
index 0000000..fff7634
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/storage/ChangelogPartitionManager.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetChangelogMapping;
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemConsumer;
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The Changelog manager is used to persist and read the changelog information from the coordinator stream.
+ */
+public class ChangelogPartitionManager {
+
+ private static final Logger log = LoggerFactory.getLogger(ChangelogPartitionManager.class);
+ private final CoordinatorStreamSystemProducer coordinatorStreamProducer;
+ private final CoordinatorStreamSystemConsumer coordinatorStreamConsumer;
+ private boolean isCoordinatorConsumerRegistered = false;
+ private String source;
+
+ public ChangelogPartitionManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
+ CoordinatorStreamSystemConsumer coordinatorStreamConsumer) {
+ this.coordinatorStreamConsumer = coordinatorStreamConsumer;
+ this.coordinatorStreamProducer = coordinatorStreamProducer;
+ this.source = "Unknown";
+ }
+
+ public ChangelogPartitionManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
+ CoordinatorStreamSystemConsumer coordinatorStreamConsumer,
+ String source) {
+ this(coordinatorStreamProducer, coordinatorStreamConsumer);
+ this.source = source;
+ }
+
+ public void start() {
+ coordinatorStreamProducer.start();
+ coordinatorStreamConsumer.start();
+ }
+
+ public void stop() {
+ coordinatorStreamConsumer.stop();
+ coordinatorStreamProducer.stop();
+ }
+
+ /**
+ * Registers this manager to write changelog mapping for a particular task.
+ * @param taskName The taskname to be registered for changelog mapping.
+ */
+ public void register(TaskName taskName) {
+ log.debug("Adding taskName {} to {}", taskName, this);
+ if(!isCoordinatorConsumerRegistered) {
+ coordinatorStreamConsumer.register();
+ isCoordinatorConsumerRegistered = true;
+ }
+ coordinatorStreamProducer.register(taskName.getTaskName());
+ }
+
+ /**
+ * Read the taskName to partition mapping that is being maintained by this ChangelogManager
+ * @return TaskName to change log partition mapping, or an empty map if there were no messages.
+ */
+ public Map<TaskName, Integer> readChangeLogPartitionMapping() {
+ log.debug("Reading changelog partition information");
+ Set<CoordinatorStreamMessage> bootstrappedStream = coordinatorStreamConsumer.getBootstrappedStream(SetChangelogMapping.TYPE);
+ HashMap<TaskName, Integer> changelogMapping = new HashMap<TaskName, Integer>();
+ for (CoordinatorStreamMessage coordinatorStreamMessage : bootstrappedStream) {
+ SetChangelogMapping changelogMapEntry = new SetChangelogMapping(coordinatorStreamMessage);
+ changelogMapping.put(new TaskName(changelogMapEntry.getTaskName()), changelogMapEntry.getPartition());
+ log.debug("TaskName: {} is mapped to {}", changelogMapEntry.getTaskName(), changelogMapEntry.getPartition());
+ }
+ return changelogMapping;
+ }
+
+ /**
+ * Write the taskName to partition mapping that is being maintained by this ChangelogManager
+ * @param changelogEntries The entries that needs to be written to the coordinator stream, the map takes the taskName
+ * and it's corresponding changelog partition.
+ */
+ public void writeChangeLogPartitionMapping(Map<TaskName, Integer> changelogEntries) {
+ log.debug("Updating changelog information with: ");
+ for (Map.Entry<TaskName, Integer> entry : changelogEntries.entrySet()) {
+ log.debug("TaskName: {} to Partition: {}", entry.getKey().getTaskName(), entry.getValue());
+ SetChangelogMapping changelogMapping = new SetChangelogMapping(source,
+ entry.getKey().getTaskName(),
+ entry.getValue());
+ coordinatorStreamProducer.send(changelogMapping);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
index ddc30af..2e3aeb8 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/CheckpointTool.scala
@@ -26,14 +26,18 @@ import org.apache.samza.checkpoint.CheckpointTool.TaskNameToCheckpointMap
import org.apache.samza.config.TaskConfig.Config2Task
import org.apache.samza.config.{Config, StreamConfig}
import org.apache.samza.container.TaskName
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.util.{CommandLine, Util}
+import org.apache.samza.util.CommandLine
import org.apache.samza.{Partition, SamzaException}
import scala.collection.JavaConversions._
import org.apache.samza.util.Logging
import org.apache.samza.coordinator.JobCoordinator
+import scala.collection.immutable.HashMap
+
+
/**
* Command-line tool for inspecting and manipulating the checkpoints for a job.
* This can be used, for example, to force a job to re-process a stream from the
@@ -113,31 +117,30 @@ object CheckpointTool {
}
}
+ def apply(config: Config, offsets: TaskNameToCheckpointMap) = {
+ val factory = new CoordinatorStreamSystemFactory
+ val coordinatorStreamConsumer = factory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap())
+ val coordinatorStreamProducer = factory.getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap())
+ val manager = new CheckpointManager(coordinatorStreamProducer, coordinatorStreamConsumer, "checkpoint-tool")
+ new CheckpointTool(config, offsets, manager)
+ }
+
def main(args: Array[String]) {
val cmdline = new CheckpointToolCommandLine
val options = cmdline.parser.parse(args: _*)
val config = cmdline.loadConfig(options)
- val tool = new CheckpointTool(config, cmdline.newOffsets)
+ val tool = CheckpointTool(config, cmdline.newOffsets)
tool.run
}
}
-class CheckpointTool(config: Config, newOffsets: TaskNameToCheckpointMap) extends Logging {
- val manager = config.getCheckpointManagerFactory match {
- case Some(className) =>
- Util.getObj[CheckpointManagerFactory](className).getCheckpointManager(config, new MetricsRegistryMap)
- case _ =>
- throw new SamzaException("This job does not use checkpointing (task.checkpoint.factory is not set).")
- }
-
- // The CheckpointManagerFactory needs to perform this same operation when initializing
- // the manager. TODO figure out some way of avoiding duplicated work.
+class CheckpointTool(config: Config, newOffsets: TaskNameToCheckpointMap, manager: CheckpointManager) extends Logging {
def run {
info("Using %s" format manager)
// Find all the TaskNames that would be generated for this job config
- val coordinator = JobCoordinator(config, 1)
+ val coordinator = JobCoordinator(config)
val taskNames = coordinator
.jobModel
.getContainers
@@ -165,7 +168,10 @@ class CheckpointTool(config: Config, newOffsets: TaskNameToCheckpointMap) extend
/** Load the most recent checkpoint state for all a specified TaskName. */
def readLastCheckpoint(taskName:TaskName): Map[SystemStreamPartition, String] = {
- manager.readLastCheckpoint(taskName).getOffsets.toMap
+ Option(manager.readLastCheckpoint(taskName))
+ .getOrElse(new Checkpoint(new HashMap[SystemStreamPartition, String]()))
+ .getOffsets
+ .toMap
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index 85c1749..20e5d26 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -72,8 +72,8 @@ object OffsetManager extends Logging {
config: Config,
checkpointManager: CheckpointManager = null,
systemAdmins: Map[String, SystemAdmin] = Map(),
- offsetManagerMetrics : OffsetManagerMetrics = new OffsetManagerMetrics) = {
-
+ offsetManagerMetrics : OffsetManagerMetrics = new OffsetManagerMetrics,
+ latestOffsets: Map[SystemStreamPartition, String] = Map()) = {
debug("Building offset manager for %s." format systemStreamMetadata)
val offsetSettings = systemStreamMetadata
@@ -99,7 +99,7 @@ object OffsetManager extends Logging {
// Build OffsetSetting so we can create a map for OffsetManager.
(systemStream, OffsetSetting(systemStreamMetadata, defaultOffsetType, resetOffset))
}.toMap
- new OffsetManager(offsetSettings, checkpointManager, systemAdmins, offsetManagerMetrics)
+ new OffsetManager(offsetSettings, checkpointManager, systemAdmins, offsetManagerMetrics, latestOffsets)
}
}
@@ -142,12 +142,19 @@ class OffsetManager(
/**
* offsetManagerMetrics for keeping track of checkpointed offsets of each SystemStreamPartition.
*/
- val offsetManagerMetrics : OffsetManagerMetrics = new OffsetManagerMetrics ) extends Logging {
+ val offsetManagerMetrics : OffsetManagerMetrics = new OffsetManagerMetrics,
+
+ /*
+ * The previously read checkpoints restored from the coordinator stream
+ */
+ val previousCheckpointedOffsets: Map[SystemStreamPartition, String] = Map()
+ ) extends Logging {
/**
* Last offsets processed for each SystemStreamPartition.
*/
- var lastProcessedOffsets = Map[SystemStreamPartition, String]()
+ // Filter out null offset values, we can't use them, these exist only because of SSP information
+ var lastProcessedOffsets = previousCheckpointedOffsets.filter(_._2 != null)
/**
* Offsets to start reading from for each SystemStreamPartition. This
@@ -170,7 +177,8 @@ class OffsetManager(
def start {
registerCheckpointManager
- loadOffsetsFromCheckpointManager
+ initializeCheckpointManager
+ loadOffsets
stripResetStreams
loadStartingOffsets
loadDefaults
@@ -240,18 +248,21 @@ class OffsetManager(
}
}
+ private def initializeCheckpointManager {
+ if (checkpointManager != null) {
+ checkpointManager.start
+ } else {
+ debug("Skipping offset load from checkpoint manager because no manager was defined.")
+ }
+ }
+
/**
* Loads last processed offsets from checkpoint manager for all registered
* partitions.
*/
- private def loadOffsetsFromCheckpointManager {
- if (checkpointManager != null) {
- debug("Loading offsets from checkpoint manager.")
-
- checkpointManager.start
-
- lastProcessedOffsets ++= systemStreamPartitions.keys
- .flatMap(restoreOffsetsFromCheckpoint(_)).filter {
+ private def loadOffsets {
+ debug("Loading offsets")
+ lastProcessedOffsets.filter {
case (systemStreamPartition, offset) =>
val shouldKeep = offsetSettings.contains(systemStreamPartition.getSystemStream)
if (!shouldKeep) {
@@ -260,26 +271,7 @@ class OffsetManager(
info("Checkpointed offset is currently %s for %s." format (offset, systemStreamPartition))
shouldKeep
}
- } else {
- debug("Skipping offset load from checkpoint manager because no manager was defined.")
- }
- }
-
- /**
- * Loads last processed offsets for a single taskName.
- */
- private def restoreOffsetsFromCheckpoint(taskName: TaskName): Map[SystemStreamPartition, String] = {
- debug("Loading checkpoints for taskName: %s." format taskName)
-
- val checkpoint = checkpointManager.readLastCheckpoint(taskName)
- if (checkpoint != null) {
- checkpoint.getOffsets.toMap
- } else {
- info("Did not receive a checkpoint for taskName %s. Proceeding without a checkpoint." format taskName)
-
- Map()
- }
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
deleted file mode 100644
index 2a87a6e..0000000
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/file/FileSystemCheckpointManager.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.checkpoint.file
-
-import java.io.File
-import java.io.FileNotFoundException
-import java.io.FileOutputStream
-import java.util
-import org.apache.samza.SamzaException
-import org.apache.samza.checkpoint.Checkpoint
-import org.apache.samza.checkpoint.CheckpointManager
-import org.apache.samza.checkpoint.CheckpointManagerFactory
-import org.apache.samza.config.Config
-import org.apache.samza.config.FileSystemCheckpointManagerConfig.Config2FSCP
-import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.container.TaskName
-import org.apache.samza.metrics.MetricsRegistry
-import org.apache.samza.serializers.CheckpointSerde
-import scala.io.Source
-
-class FileSystemCheckpointManager(
- jobName: String,
- root: File,
- serde: CheckpointSerde = new CheckpointSerde) extends CheckpointManager {
-
- override def register(taskName: TaskName):Unit = Unit
-
- def getCheckpointFile(taskName: TaskName) = getFile(jobName, taskName, "checkpoints")
-
- def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) {
- val bytes = serde.toBytes(checkpoint)
- val fos = new FileOutputStream(getCheckpointFile(taskName))
-
- fos.write(bytes)
- fos.close
- }
-
- def readLastCheckpoint(taskName: TaskName): Checkpoint = {
- try {
- val bytes = Source.fromFile(getCheckpointFile(taskName)).map(_.toByte).toArray
-
- serde.fromBytes(bytes)
- } catch {
- case e: FileNotFoundException => null
- }
- }
-
- def start {
- if (!root.exists) {
- throw new SamzaException("Root directory for file system checkpoint manager does not exist: %s" format root)
- }
- }
-
- def stop {}
-
- private def getFile(jobName: String, taskName: TaskName, fileType:String) =
- new File(root, "%s-%s-%s" format (jobName, taskName, fileType))
-
- private def getChangeLogPartitionMappingFile() = getFile(jobName, new TaskName("partition-mapping"), "changelog-partition-mapping")
-
- override def readChangeLogPartitionMapping(): util.Map[TaskName, java.lang.Integer] = {
- try {
- val bytes = Source.fromFile(getChangeLogPartitionMappingFile()).map(_.toByte).toArray
- serde.changelogPartitionMappingFromBytes(bytes)
- } catch {
- case e: FileNotFoundException => new util.HashMap[TaskName, java.lang.Integer]()
- }
- }
-
- def writeChangeLogPartitionMapping(mapping: util.Map[TaskName, java.lang.Integer]): Unit = {
- val hashmap = new util.HashMap[TaskName, java.lang.Integer](mapping)
- val bytes = serde.changelogPartitionMappingToBytes(hashmap)
- val fos = new FileOutputStream(getChangeLogPartitionMappingFile())
-
- fos.write(bytes)
- fos.close
- }
-}
-
-class FileSystemCheckpointManagerFactory extends CheckpointManagerFactory {
- def getCheckpointManager(config: Config, registry: MetricsRegistry) = {
- val name = config
- .getName
- .getOrElse(throw new SamzaException("Missing job name in configs"))
- val root = config
- .getFileSystemCheckpointRoot
- .getOrElse(throw new SamzaException("Missing checkpoint root in configs"))
- new FileSystemCheckpointManager(name, new File(root))
- }
-}
[3/4] samza git commit: SAMZA-465: Use coordinator stream and
eliminate CheckpointManager
Posted by ni...@apache.org.
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 3b6685e..e4b14f4 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -20,6 +20,9 @@
package org.apache.samza.config
import org.apache.samza.container.grouper.stream.GroupByPartitionFactory
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.util.Logging
object JobConfig {
// job config constants
@@ -35,15 +38,45 @@ object JobConfig {
val CONFIG_REWRITER_CLASS = "job.config.rewriter.%s.class" // streaming.job_config_rewriter_class - regex, system, config
val JOB_NAME = "job.name" // streaming.job_name
val JOB_ID = "job.id" // streaming.job_id
-
+ val JOB_COORDINATOR_SYSTEM = "job.coordinator.system"
+ val JOB_CONTAINER_COUNT = "job.container.count"
+ val JOB_REPLICATION_FACTOR = "job.coordinator.replication.factor"
+ val JOB_SEGMENT_BYTES = "job.coordinator.segment.bytes"
val SSP_GROUPER_FACTORY = "job.systemstreampartition.grouper.factory"
implicit def Config2Job(config: Config) = new JobConfig(config)
}
-class JobConfig(config: Config) extends ScalaMapConfig(config) {
+class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
def getName = getOption(JobConfig.JOB_NAME)
+ def getCoordinatorSystemName = getOption(JobConfig.JOB_COORDINATOR_SYSTEM).getOrElse({
+ // If no coordinator system is configured, try and guess it if there's just one system configured.
+ val systemNames = config.getSystemNames.toSet
+ if (systemNames.size == 1) {
+ val systemName = systemNames.iterator.next
+ info("No coordinator system defined, so defaulting to %s" format systemName)
+ systemName
+ } else {
+ throw new ConfigException("Missing job.coordinator.system configuration.")
+ }
+ })
+
+ def getContainerCount = {
+ getOption(JobConfig.JOB_CONTAINER_COUNT) match {
+ case Some(count) => count.toInt
+ case _ =>
+ // To maintain backwards compatibility, honor yarn.container.count for now.
+ // TODO get rid of this in a future release.
+ getOption("yarn.container.count") match {
+ case Some(count) =>
+ warn("Configuration 'yarn.container.count' is deprecated. Please use %s." format JobConfig.JOB_CONTAINER_COUNT)
+ count.toInt
+ case _ => 1
+ }
+ }
+ }
+
def getStreamJobFactoryClass = getOption(JobConfig.STREAM_JOB_FACTORY_CLASS)
def getJobId = getOption(JobConfig.JOB_ID)
@@ -53,4 +86,30 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) {
def getConfigRewriterClass(name: String) = getOption(JobConfig.CONFIG_REWRITER_CLASS format name)
def getSystemStreamPartitionGrouperFactory = getOption(JobConfig.SSP_GROUPER_FACTORY).getOrElse(classOf[GroupByPartitionFactory].getCanonicalName)
+
+ val CHECKPOINT_SEGMENT_BYTES = "task.checkpoint.segment.bytes"
+
+ def getCoordinatorReplicationFactor = getOption(JobConfig.JOB_REPLICATION_FACTOR) match {
+ case Some(rplFactor) => rplFactor
+ case _ =>
+ // TODO get rid of checkpoint configs in a future release
+ getOption("task.checkpoint.replication.factor") match {
+ case Some(rplFactor) =>
+ warn("Configuration 'task.checkpoint.replication.factor' is deprecated. Please use %s." format JobConfig.JOB_REPLICATION_FACTOR)
+ rplFactor
+ case _ => "3"
+ }
+ }
+
+ def getCoordinatorSegmentBytes = getOption(JobConfig.JOB_SEGMENT_BYTES) match {
+ case Some(segBytes) => segBytes
+ case _ =>
+ // TODO get rid of checkpoint configs in a future release
+ getOption("task.checkpoint.segment.bytes") match {
+ case Some(segBytes) =>
+ warn("Configuration 'task.checkpoint.segment.bytes' is deprecated. Please use %s." format JobConfig.JOB_SEGMENT_BYTES)
+ segBytes
+ case _ => "26214400"
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
index 1a2dd44..e94a473 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
@@ -21,9 +21,9 @@ package org.apache.samza.config
object ShellCommandConfig {
/**
- * This environment variable is used to store a JSON serialized map of all configuration.
+ * This environment variable is used to store a JSON serialized map of all coordinator system configs.
*/
- val ENV_CONFIG = "SAMZA_CONFIG"
+ val ENV_COORDINATOR_SYSTEM_CONFIG = "SAMZA_COORDINATOR_SYSTEM_CONFIG"
/**
* The ID for a container. This is an integer number between 0 and
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
index adef09e..e172589 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
@@ -21,8 +21,6 @@ package org.apache.samza.config
import org.apache.samza.util.Logging
import scala.collection.JavaConversions._
-import org.apache.samza.SamzaException
-import org.apache.samza.util.Util
import org.apache.samza.system.SystemStream
object StreamConfig {
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
index cd06c06..0b3a235 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -19,8 +19,8 @@
package org.apache.samza.config
-import org.apache.samza.util.Util
import org.apache.samza.system.SystemStream
+import org.apache.samza.util.Util
object TaskConfig {
// task config constants
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 720fbdc..56819e0 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -21,7 +21,7 @@ package org.apache.samza.container
import java.io.File
import org.apache.samza.SamzaException
-import org.apache.samza.checkpoint.{ CheckpointManagerFactory, OffsetManager }
+import org.apache.samza.checkpoint.{ CheckpointManager, OffsetManager }
import org.apache.samza.config.Config
import org.apache.samza.config.MetricsConfig.Config2Metrics
import org.apache.samza.config.SerializerConfig.Config2Serializer
@@ -30,6 +30,7 @@ import org.apache.samza.config.StorageConfig.Config2Storage
import org.apache.samza.config.StreamConfig.Config2Stream
import org.apache.samza.config.SystemConfig.Config2System
import org.apache.samza.config.TaskConfig.Config2Task
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
import org.apache.samza.metrics.JmxServer
import org.apache.samza.metrics.JvmMetrics
import org.apache.samza.metrics.MetricsRegistryMap
@@ -328,17 +329,16 @@ object SamzaContainer extends Logging {
info("Got metrics reporters: %s" format reporters.keys)
- val checkpointManager = config.getCheckpointManagerFactory match {
- case Some(checkpointFactoryClassName) if (!checkpointFactoryClassName.isEmpty) =>
- Util
- .getObj[CheckpointManagerFactory](checkpointFactoryClassName)
- .getCheckpointManager(config, samzaContainerMetrics.registry)
- case _ => null
- }
+ val coordinatorSystemConsumer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemConsumer(config, samzaContainerMetrics.registry)
+ val coordinatorSystemProducer = new CoordinatorStreamSystemFactory().getCoordinatorStreamSystemProducer(config, samzaContainerMetrics.registry)
+ val checkpointManager = new CheckpointManager(coordinatorSystemProducer, coordinatorSystemConsumer, String.valueOf(containerId))
info("Got checkpoint manager: %s" format checkpointManager)
- val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager, systemAdmins, offsetManagerMetrics)
+ val combinedOffsets: Map[SystemStreamPartition, String] =
+ containerModel.getTasks.values().flatMap(_.getCheckpointedOffsets).toMap
+
+ val offsetManager = OffsetManager(inputStreamMetadata, config, checkpointManager, systemAdmins, offsetManagerMetrics, combinedOffsets)
info("Got offset manager: %s" format offsetManager)
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
index c14f2f6..5b43b58 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
@@ -19,61 +19,74 @@
package org.apache.samza.coordinator
+
import org.apache.samza.config.Config
-import org.apache.samza.job.model.JobModel
+import org.apache.samza.job.model.{ContainerModel, JobModel, TaskModel}
import org.apache.samza.SamzaException
import org.apache.samza.container.grouper.task.GroupByContainerCount
-import org.apache.samza.util.Util
-import org.apache.samza.checkpoint.CheckpointManagerFactory
import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
import java.util
import org.apache.samza.container.TaskName
+import org.apache.samza.storage.ChangelogPartitionManager
import org.apache.samza.util.Logging
-import org.apache.samza.config.TaskConfig.Config2Task
import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.config.StorageConfig.Config2Storage
+import org.apache.samza.util.Util
import scala.collection.JavaConversions._
import org.apache.samza.config.JobConfig.Config2Job
import org.apache.samza.config.TaskConfig.Config2Task
-import org.apache.samza.config.SystemConfig.Config2System
import org.apache.samza.Partition
-import org.apache.samza.job.model.TaskModel
import org.apache.samza.system.StreamMetadataCache
import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.serializers.model.SamzaObjectMapper
-import java.net.URL
import org.apache.samza.system.SystemFactory
import org.apache.samza.coordinator.server.HttpServer
-import org.apache.samza.checkpoint.CheckpointManager
+import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
import org.apache.samza.coordinator.server.JobServlet
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemConsumer, CoordinatorStreamSystemProducer, CoordinatorStreamMessage, CoordinatorStreamSystemFactory}
+import org.apache.samza.config.ConfigRewriter
+/**
+ * Helper companion object that is responsible for wiring up a JobCoordinator
+ * given a Config object.
+ */
object JobCoordinator extends Logging {
+
/**
- * Build a JobCoordinator using a Samza job's configuration.
+ * @param coordinatorSystemConfig A config object that contains job.name,
+ * job.id, and all system.<job-coordinator-system-name>.*
+ * configuration. The method will use this config to read all configuration
+ * from the coordinator stream, and instantiate a JobCoordinator.
*/
- def apply(config: Config, containerCount: Int) = {
- val jobModel = buildJobModel(config, containerCount)
- val server = new HttpServer
- server.addServlet("/*", new JobServlet(jobModel))
- new JobCoordinator(jobModel, server)
+ def apply(coordinatorSystemConfig: Config, metricsRegistryMap: MetricsRegistryMap): JobCoordinator = {
+ val coordinatorStreamSystemFactory: CoordinatorStreamSystemFactory = new CoordinatorStreamSystemFactory()
+ val coordinatorSystemConsumer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemConsumer(coordinatorSystemConfig, metricsRegistryMap)
+ val coordinatorSystemProducer = coordinatorStreamSystemFactory.getCoordinatorStreamSystemProducer(coordinatorSystemConfig, metricsRegistryMap)
+ info("Registering coordinator system stream.")
+ coordinatorSystemConsumer.register
+ debug("Starting coordinator system stream.")
+ coordinatorSystemConsumer.start
+ debug("Bootstrapping coordinator system stream.")
+ coordinatorSystemConsumer.bootstrap
+ debug("Stopping coordinator system stream.")
+ coordinatorSystemConsumer.stop
+ val config = coordinatorSystemConsumer.getConfig
+ info("Got config: %s" format config)
+ val checkpointManager = new CheckpointManager(coordinatorSystemProducer, coordinatorSystemConsumer, "Job-coordinator")
+ val changelogManager = new ChangelogPartitionManager(coordinatorSystemProducer, coordinatorSystemConsumer, "Job-coordinator")
+ getJobCoordinator(rewriteConfig(config), checkpointManager, changelogManager)
}
+ def apply(coordinatorSystemConfig: Config): JobCoordinator = apply(coordinatorSystemConfig, new MetricsRegistryMap())
+
/**
- * Gets a CheckpointManager from the configuration.
+ * Build a JobCoordinator using a Samza job's configuration.
*/
- def getCheckpointManager(config: Config) = {
- config.getCheckpointManagerFactory match {
- case Some(checkpointFactoryClassName) =>
- Util
- .getObj[CheckpointManagerFactory](checkpointFactoryClassName)
- .getCheckpointManager(config, new MetricsRegistryMap)
- case _ =>
- if (!config.getStoreNames.isEmpty) {
- throw new SamzaException("Storage factories configured, but no checkpoint manager has been specified. " +
- "Unable to start job as there would be no place to store changelog partition mapping.")
- }
- null
- }
+ def getJobCoordinator(config: Config, checkpointManager: CheckpointManager, changelogManager: ChangelogPartitionManager) = {
+ val containerCount = config.getContainerCount
+ val jobModelGenerator = initializeJobModel(config, containerCount, checkpointManager, changelogManager)
+ val server = new HttpServer
+ server.addServlet("/*", new JobServlet(jobModelGenerator))
+ new JobCoordinator(jobModelGenerator(), server, checkpointManager)
}
/**
@@ -115,74 +128,135 @@ object JobCoordinator extends Logging {
}
/**
- * Build a full Samza job model using the job configuration.
+ * Re-writes configuration using a ConfigRewriter, if one is defined. If
+ * there is no ConfigRewriter defined for the job, then this method is a
+ * no-op.
+ *
+ * @param config The config to re-write.
*/
- def buildJobModel(config: Config, containerCount: Int) = {
- // TODO containerCount should go away when we generalize the job coordinator,
+ def rewriteConfig(config: Config): Config = {
+ def rewrite(c: Config, rewriterName: String): Config = {
+ val klass = config
+ .getConfigRewriterClass(rewriterName)
+ .getOrElse(throw new SamzaException("Unable to find class config for config rewriter %s." format rewriterName))
+ val rewriter = Util.getObj[ConfigRewriter](klass)
+ info("Re-writing config with " + rewriter)
+ rewriter.rewrite(rewriterName, c)
+ }
+
+ config.getConfigRewriters match {
+ case Some(rewriters) => rewriters.split(",").foldLeft(config)(rewrite(_, _))
+ case _ => config
+ }
+ }
+
+ /**
+ * The method intializes the jobModel and creates a JobModel generator which can be used to generate new JobModels
+ * which catchup with the latest content from the coordinator stream.
+ */
+ private def initializeJobModel(config: Config,
+ containerCount: Int,
+ checkpointManager: CheckpointManager,
+ changelogManager: ChangelogPartitionManager): () => JobModel = {
+ // TODO containerCount should go away when we generalize the job coordinator,
// and have a non-yarn-specific way of specifying container count.
- val checkpointManager = getCheckpointManager(config)
+
+ // Do grouping to fetch TaskName to SSP mapping
val allSystemStreamPartitions = getInputStreamPartitions(config)
val grouper = getSystemStreamPartitionGrouper(config)
- val previousChangelogeMapping = if (checkpointManager != null) {
- checkpointManager.start
- checkpointManager.readChangeLogPartitionMapping
- } else {
- new util.HashMap[TaskName, java.lang.Integer]()
+ info("SystemStreamPartitionGrouper " + grouper + " has grouped the SystemStreamPartitions into the following taskNames:")
+ val groups = grouper.group(allSystemStreamPartitions)
+
+ // Initialize the ChangelogPartitionManager and the CheckpointManager
+ val previousChangelogeMapping = if (changelogManager != null)
+ {
+ changelogManager.start
+ changelogManager.readChangeLogPartitionMapping
}
- var maxChangelogPartitionId = previousChangelogeMapping
- .values
- .map(_.toInt)
- .toList
- .sorted
- .lastOption
- .getOrElse(-1)
-
- // Assign all SystemStreamPartitions to TaskNames.
- val taskModels = {
- val groups = grouper.group(allSystemStreamPartitions)
- info("SystemStreamPartitionGrouper " + grouper + " has grouped the SystemStreamPartitions into the following taskNames:")
- groups
- .map {
- case (taskName, systemStreamPartitions) =>
- val changelogPartition = Option(previousChangelogeMapping.get(taskName)) match {
- case Some(changelogPartitionId) => new Partition(changelogPartitionId)
- case _ =>
- // If we've never seen this TaskName before, then assign it a
- // new changelog.
- maxChangelogPartitionId += 1
- info("New task %s is being assigned changelog partition %s." format (taskName, maxChangelogPartitionId))
- new Partition(maxChangelogPartitionId)
- }
- new TaskModel(taskName, systemStreamPartitions, changelogPartition)
- }
- .toSet
+ else
+ {
+ new util.HashMap[TaskName, java.lang.Integer]()
}
+ checkpointManager.start
+ groups.foreach(taskSSP => checkpointManager.register(taskSSP._1))
+
+ // Generate the jobModel
+ def jobModelGenerator(): JobModel = refreshJobModel(config,
+ allSystemStreamPartitions,
+ checkpointManager,
+ groups,
+ previousChangelogeMapping,
+ containerCount)
+
+ val jobModel = jobModelGenerator()
- // Save the changelog mapping back to the checkpoint manager.
- if (checkpointManager != null) {
- // newChangelogMapping is the merging of all current task:changelog
+ // Save the changelog mapping back to the ChangelogPartitionmanager
+ if (changelogManager != null)
+ {
+ // newChangelogMapping is the merging of all current task:changelog
// assignments with whatever we had before (previousChangelogeMapping).
- // We must persist legacy changelog assignments so that
- // maxChangelogPartitionId always has the absolute max, not the current
- // max (in case the task with the highest changelog partition mapping
+ // We must persist legacy changelog assignments so that
+ // maxChangelogPartitionId always has the absolute max, not the current
+ // max (in case the task with the highest changelog partition mapping
// disappears.
- val newChangelogMapping = taskModels.map(taskModel => {
- taskModel.getTaskName -> Integer.valueOf(taskModel.getChangelogPartition.getPartitionId)
- }).toMap ++ previousChangelogeMapping
+ val newChangelogMapping = jobModel.getContainers.flatMap(_._2.getTasks).map{case (taskName,taskModel) => {
+ taskName -> Integer.valueOf(taskModel.getChangelogPartition.getPartitionId)
+ }}.toMap ++ previousChangelogeMapping
info("Saving task-to-changelog partition mapping: %s" format newChangelogMapping)
- checkpointManager.writeChangeLogPartitionMapping(newChangelogMapping)
- checkpointManager.stop
+ changelogManager.writeChangeLogPartitionMapping(newChangelogMapping)
}
+ // Return a jobModelGenerator lambda that can be used to refresh the job model
+ jobModelGenerator
+ }
- // Here is where we should put in a pluggable option for the
- // SSPTaskNameGrouper for locality, load-balancing, etc.
- val containerGrouper = new GroupByContainerCount(containerCount)
- val containerModels = containerGrouper
- .group(taskModels)
- .map { case (containerModel) => Integer.valueOf(containerModel.getContainerId) -> containerModel }
- .toMap
+ /**
+ * Build a full Samza job model. The function reads the latest checkpoint from the underlying coordinator stream and
+ * builds a new JobModel.
+ * This method needs to be thread safe, the reason being, for every HTTP request from a container, this method is called
+ * and underlying it uses the same instance of coordinator stream producer and coordinator stream consumer.
+ */
+ private def refreshJobModel(config: Config,
+ allSystemStreamPartitions: util.Set[SystemStreamPartition],
+ checkpointManager: CheckpointManager,
+ groups: util.Map[TaskName, util.Set[SystemStreamPartition]],
+ previousChangelogeMapping: util.Map[TaskName, Integer],
+ containerCount: Int): JobModel = {
+ this.synchronized
+ {
+ // If no mappings are present(first time the job is running) we return -1, this will allow 0 to be the first change
+ // mapping.
+ var maxChangelogPartitionId = previousChangelogeMapping.values.map(_.toInt).toList.sorted.lastOption.getOrElse(-1)
+
+ // Assign all SystemStreamPartitions to TaskNames.
+ val taskModels =
+ {
+ groups.map
+ { case (taskName, systemStreamPartitions) =>
+ val checkpoint = Option(checkpointManager.readLastCheckpoint(taskName)).getOrElse(new Checkpoint(new util.HashMap[SystemStreamPartition, String]()))
+ // Find the system partitions which don't have a checkpoint and set null for the values for offsets
+ val offsetMap = systemStreamPartitions.map(ssp => (ssp -> null)).toMap ++ checkpoint.getOffsets
+ val changelogPartition = Option(previousChangelogeMapping.get(taskName)) match
+ {
+ case Some(changelogPartitionId) => new Partition(changelogPartitionId)
+ case _ =>
+ // If we've never seen this TaskName before, then assign it a
+ // new changelog.
+ maxChangelogPartitionId += 1
+ info("New task %s is being assigned changelog partition %s." format(taskName, maxChangelogPartitionId))
+ new Partition(maxChangelogPartitionId)
+ }
+ new TaskModel(taskName, offsetMap, changelogPartition)
+ }.toSet
+ }
+
+ // Here is where we should put in a pluggable option for the
+ // SSPTaskNameGrouper for locality, load-balancing, etc.
+ val containerGrouper = new GroupByContainerCount(containerCount)
+ val containerModels = containerGrouper.group(taskModels).map
+ { case (containerModel) => Integer.valueOf(containerModel.getContainerId) -> containerModel }.toMap
- new JobModel(config, containerModels)
+ new JobModel(config, containerModels)
+ }
}
}
@@ -207,19 +281,31 @@ class JobCoordinator(
/**
* HTTP server used to serve a Samza job's container model to SamzaContainers when they start up.
*/
- val server: HttpServer) extends Logging {
+ val server: HttpServer = null,
+
+ /**
+ * Handle to checkpoint manager that's used to refresh the JobModel
+ */
+ val checkpointManager: CheckpointManager) extends Logging {
debug("Got job model: %s." format jobModel)
def start {
- debug("Starting HTTP server.")
- server.start
- info("Startd HTTP server: %s" format server.getUrl)
+ if (server != null) {
+ debug("Starting HTTP server.")
+ server.start
+ info("Startd HTTP server: %s" format server.getUrl)
+ }
}
def stop {
- debug("Stopping HTTP server.")
- server.stop
- info("Stopped HTTP server.")
+ if (server != null) {
+ debug("Stopping HTTP server.")
+ server.stop
+ info("Stopped HTTP server.")
+ debug("Stopping checkpoint manager.")
+ checkpointManager.stop()
+ info("Stopped checkpoint manager.")
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala
index 10986a4..dfe3a45 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/server/HttpServer.scala
@@ -20,8 +20,6 @@
package org.apache.samza.coordinator.server;
import java.net.InetAddress
-import java.net.URI
-import java.net.UnknownHostException
import javax.servlet.Servlet
import org.apache.samza.SamzaException
import org.eclipse.jetty.server.Connector
@@ -32,6 +30,7 @@ import org.eclipse.jetty.servlet.ServletHolder
import java.net.URL
import org.apache.samza.util.Logging
+
/**
* <p>A Jetty-based HTTP server. The server allows arbitrary servlets to be added
* with the addServlet() method. The server is configured to automatically
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
index 635c353..a3baddb 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/server/JobServlet.scala
@@ -19,12 +19,13 @@
package org.apache.samza.coordinator.server
+
import org.apache.samza.job.model.JobModel
import org.apache.samza.util.Logging
/**
* A servlet that dumps the job model for a Samza job.
*/
-class JobServlet(jobModel: JobModel) extends ServletBase with Logging {
- protected def getObjectToWrite() = jobModel
-}
+class JobServlet(jobModelGenerator: () => JobModel) extends ServletBase with Logging {
+ protected def getObjectToWrite() = jobModelGenerator()
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala
new file mode 100644
index 0000000..9283812
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/stream/CoordinatorStreamSystemFactory.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream
+
+import org.apache.samza.SamzaException
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.config.{Config, SystemConfig}
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.metrics.MetricsRegistry
+import org.apache.samza.system.{SystemFactory, SystemStream}
+import org.apache.samza.util.Util
+
+/**
+ * A helper class that does wiring for CoordinatorStreamSystemConsumer and
+ * CoordinatorStreamSystemProducer. This factory should only be used in
+ * situations where the underlying SystemConsumer/SystemProducer does not
+ * exist.
+ */
+class CoordinatorStreamSystemFactory {
+ def getCoordinatorStreamSystemConsumer(config: Config, registry: MetricsRegistry) = {
+ val (coordinatorSystemStream, systemFactory) = Util.getCoordinatorSystemStreamAndFactory(config)
+ val systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem, config)
+ val systemConsumer = systemFactory.getConsumer(coordinatorSystemStream.getSystem, config, registry)
+ new CoordinatorStreamSystemConsumer(coordinatorSystemStream, systemConsumer, systemAdmin)
+ }
+
+ def getCoordinatorStreamSystemProducer(config: Config, registry: MetricsRegistry) = {
+ val (coordinatorSystemStream, systemFactory) = Util.getCoordinatorSystemStreamAndFactory(config)
+ val systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem, config)
+ val systemProducer = systemFactory.getProducer(coordinatorSystemStream.getSystem, config, registry)
+ new CoordinatorStreamSystemProducer(coordinatorSystemStream, systemProducer, systemAdmin)
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index 0b720ec..1c178a6 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -20,16 +20,27 @@
package org.apache.samza.job
import org.apache.samza.SamzaException
-import org.apache.samza.config.{Config, ConfigRewriter}
+import org.apache.samza.config.Config
import org.apache.samza.config.JobConfig.Config2Job
-import org.apache.samza.config.factories.PropertiesConfigFactory
import org.apache.samza.job.ApplicationStatus.Running
-import org.apache.samza.util.Util
import org.apache.samza.util.CommandLine
import org.apache.samza.util.Logging
+import org.apache.samza.util.Util
import scala.collection.JavaConversions._
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.config.ConfigException
+import org.apache.samza.config.SystemConfig
+import org.apache.samza.system.SystemFactory
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetConfig
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer
+import org.apache.samza.system.SystemStream
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
+
+object JobRunner {
+ val SOURCE = "job-runner"
-object JobRunner extends Logging {
def main(args: Array[String]) {
val cmdline = new CommandLine
val options = cmdline.parser.parse(args: _*)
@@ -43,23 +54,43 @@ object JobRunner extends Logging {
* on a config URI. The configFactory is instantiated, fed the configPath,
* and returns a Config, which is used to execute the job.
*/
-class JobRunner(config: Config) extends Logging with Runnable {
-
- def run() {
- val conf = rewriteConfig(config)
-
- val jobFactoryClass = conf.getStreamJobFactoryClass match {
+class JobRunner(config: Config) extends Logging {
+ def run() = {
+ debug("config: %s" format (config))
+ val jobFactoryClass = config.getStreamJobFactoryClass match {
case Some(factoryClass) => factoryClass
case _ => throw new SamzaException("no job factory class defined")
}
-
val jobFactory = Class.forName(jobFactoryClass).newInstance.asInstanceOf[StreamJobFactory]
-
info("job factory: %s" format (jobFactoryClass))
- debug("config: %s" format (conf))
+ val factory = new CoordinatorStreamSystemFactory
+ val coordinatorSystemConsumer = factory.getCoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)
+ val coordinatorSystemProducer = factory.getCoordinatorStreamSystemProducer(config, new MetricsRegistryMap)
+
+ // Create the coordinator stream if it doesn't exist
+ info("Creating coordinator stream");
+ val (coordinatorSystemStream, systemFactory) = Util.getCoordinatorSystemStreamAndFactory(config)
+ val systemAdmin = systemFactory.getAdmin(coordinatorSystemStream.getSystem, config)
+ systemAdmin.createCoordinatorStream(coordinatorSystemStream.getStream)
+
+ info("Storing config in coordinator stream.")
+ coordinatorSystemProducer.register(JobRunner.SOURCE)
+ coordinatorSystemProducer.start
+ coordinatorSystemProducer.writeConfig(JobRunner.SOURCE, config)
+ info("Loading old config from coordinator stream.")
+ coordinatorSystemConsumer.register
+ coordinatorSystemConsumer.start
+ coordinatorSystemConsumer.bootstrap
+ coordinatorSystemConsumer.stop
+ val oldConfig = coordinatorSystemConsumer.getConfig();
+ info("Deleting old configs that are no longer defined: %s".format(oldConfig.keySet -- config.keySet))
+ (oldConfig.keySet -- config.keySet).foreach(key => {
+ coordinatorSystemProducer.send(new CoordinatorStreamMessage.Delete(JobRunner.SOURCE, key, SetConfig.TYPE))
+ })
+ coordinatorSystemProducer.stop
// Create the actual job, and submit it.
- val job = jobFactory.getJob(conf).submit
+ val job = jobFactory.getJob(config).submit
info("waiting for job to start")
@@ -76,22 +107,6 @@ class JobRunner(config: Config) extends Logging with Runnable {
}
info("exiting")
- }
-
- // Apply any and all config re-writer classes that the user has specified
- def rewriteConfig(config: Config): Config = {
- def rewrite(c: Config, rewriterName: String): Config = {
- val klass = config
- .getConfigRewriterClass(rewriterName)
- .getOrElse(throw new SamzaException("Unable to find class config for config rewriter %s." format rewriterName))
- val rewriter = Util.getObj[ConfigRewriter](klass)
- info("Re-writing config file with " + rewriter)
- rewriter.rewrite(rewriterName, c)
- }
-
- config.getConfigRewriters match {
- case Some(rewriters) => rewriters.split(",").foldLeft(config)(rewrite(_, _))
- case None => config
- }
+ job
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
index fd9719a..4fac154 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
@@ -30,7 +30,7 @@ import org.apache.samza.util.{Logging, Util}
*/
class ProcessJobFactory extends StreamJobFactory with Logging {
def getJob(config: Config): StreamJob = {
- val coordinator = JobCoordinator(config, 1)
+ val coordinator = JobCoordinator(config)
val containerModel = coordinator.jobModel.getContainers.get(0)
val commandBuilder = {
@@ -48,11 +48,12 @@ class ProcessJobFactory extends StreamJobFactory with Logging {
}
// JobCoordinator is stopped by ProcessJob when it exits
coordinator.start
+ val coordinatorSystemConfig = Util.buildCoordinatorStreamConfig(config)
commandBuilder
- .setConfig(config)
- .setId(0)
- .setUrl(coordinator.server.getUrl)
+ .setConfig(coordinatorSystemConfig)
+ .setId(0)
+ .setUrl(coordinator.server.getUrl)
new ProcessJob(commandBuilder, coordinator)
}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index 530255e..60ee36f 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -19,6 +19,8 @@
package org.apache.samza.job.local
+
+import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.util.Logging
import org.apache.samza.SamzaException
import org.apache.samza.config.Config
@@ -26,7 +28,6 @@ import org.apache.samza.config.ShellCommandConfig._
import org.apache.samza.config.TaskConfig._
import org.apache.samza.container.SamzaContainer
import org.apache.samza.job.{ StreamJob, StreamJobFactory }
-import org.apache.samza.util.Util
import org.apache.samza.config.JobConfig._
import org.apache.samza.coordinator.JobCoordinator
@@ -36,7 +37,7 @@ import org.apache.samza.coordinator.JobCoordinator
class ThreadJobFactory extends StreamJobFactory with Logging {
def getJob(config: Config): StreamJob = {
info("Creating a ThreadJob, which is only meant for debugging.")
- val coordinator = JobCoordinator(config, 1)
+ val coordinator = JobCoordinator(config)
val containerModel = coordinator.jobModel.getContainers.get(0)
// Give developers a nice friendly warning if they've specified task.opts and are using a threaded job.
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala
index 744eec0..4b658c1 100644
--- a/samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala
+++ b/samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala
@@ -18,19 +18,33 @@
*/
package org.apache.samza.serializers
+
+import org.apache.samza.SamzaException
+import org.apache.samza.serializers.model.SamzaObjectMapper
+import org.codehaus.jackson.`type`.TypeReference
import org.codehaus.jackson.map.ObjectMapper
-import java.nio.ByteBuffer
import org.apache.samza.config.Config
-class JsonSerde extends Serde[Object] {
- val objectMapper = new ObjectMapper
+class JsonSerde[T] extends Serde[T] {
+ val mapper = SamzaObjectMapper.getObjectMapper()
+
+ def toBytes(obj: T): Array[Byte] = {
+ try {
+ mapper.writeValueAsString(obj).getBytes("UTF-8")
+ }
+ catch {
+ case e: Exception => throw new SamzaException(e);
+ }
+ }
- def toBytes(obj: Object) = objectMapper
- .writeValueAsString(obj)
- .getBytes("UTF-8")
+ def fromBytes(bytes: Array[Byte]): T = {
+ try {
+ mapper.readValue(new String(bytes, "UTF-8"), new TypeReference[T]() {})}
+ catch {
+ case e: Exception => throw new SamzaException(e);
+ }
+ }
- def fromBytes(bytes: Array[Byte]) = objectMapper
- .readValue(bytes, classOf[Object])
}
class JsonSerdeFactory extends SerdeFactory[Object] {
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
index ec1d749..097f410 100644
--- a/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
+++ b/samza-core/src/main/scala/org/apache/samza/system/filereader/FileReaderSystemAdmin.scala
@@ -131,7 +131,11 @@ class FileReaderSystemAdmin extends SystemAdmin with Logging {
enterPosition
}
- override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) = {
- throw new SamzaException("Method not implemented")
+ def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) = {
+ throw new UnsupportedOperationException("Method not implemented.")
+ }
+
+ def createCoordinatorStream(streamName: String) {
+ throw new UnsupportedOperationException("Method not implemented.")
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 1a67586..8a83566 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -19,14 +19,20 @@
package org.apache.samza.util
-import java.net.URL
-import java.io.BufferedReader
+import java.net.{HttpURLConnection, URL}
+import java.io.{InputStream, BufferedReader, File, InputStreamReader}
import java.lang.management.ManagementFactory
-import java.io.File
-import org.apache.samza.system.SystemStream
+import org.apache.samza.{SamzaException, Partition}
+import org.apache.samza.system.{SystemFactory, SystemStreamPartition, SystemStream}
import java.util.Random
-import org.apache.samza.job.model.JobModel
-import java.io.InputStreamReader
+import org.apache.samza.config.Config
+import org.apache.samza.config.SystemConfig
+import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.config.SystemConfig.Config2System
+import org.apache.samza.config.ConfigException
+import org.apache.samza.config.MapConfig
+import scala.collection.JavaConversions._
+import org.apache.samza.config.JobConfig
object Util extends Logging {
val random = new Random
@@ -114,14 +120,121 @@ object Util extends Logging {
* @param timeout How long to wait before timing out when connecting to or reading from the HTTP server.
* @return String payload of the body of the HTTP response.
*/
- def read(url: URL, timeout: Int = 30000): String = {
- val conn = url.openConnection();
+ def read(url: URL, timeout: Int = 60000): String = {
+ var httpConn = getHttpConnection(url, timeout)
+ val retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy
+ retryBackoff.run(loop => {
+ if(httpConn.getResponseCode != 200)
+ {
+ warn("Error: " + httpConn.getResponseCode)
+ val errorContent = readStream(httpConn.getErrorStream)
+ warn("Error reading stream, failed with response %s" format errorContent)
+ httpConn = getHttpConnection(url, timeout)
+ }
+ else
+ {
+ loop.done
+ }
+ },
+ (exception, loop) => {
+ exception match {
+ case e: Exception =>
+ loop.done
+ error("Unable to connect to Job coordinator server, received exception", e)
+ throw e
+ }
+ })
+
+ if(httpConn.getResponseCode != 200) {
+ throw new SamzaException("Unable to read JobModel from Jobcoordinator HTTP server")
+ }
+ readStream(httpConn.getInputStream)
+ }
+
+ private def getHttpConnection(url: URL, timeout: Int): HttpURLConnection = {
+ val conn = url.openConnection()
conn.setConnectTimeout(timeout)
conn.setReadTimeout(timeout)
- val br = new BufferedReader(new InputStreamReader(conn.getInputStream));
+ conn.asInstanceOf[HttpURLConnection]
+ }
+ private def readStream(stream: InputStream): String = {
+ val br = new BufferedReader(new InputStreamReader(stream));
var line: String = null;
val body = Iterator.continually(br.readLine()).takeWhile(_ != null).mkString
br.close
+ stream.close
body
}
+
+
+ /**
+ * Generates a coordinator stream name based off of the job name and job id
+ * for the jobd. The format is of the stream name will be
+ * __samza_coordinator_<JOBNAME>_<JOBID>.
+ */
+ def getCoordinatorStreamName(jobName: String, jobId: String) = {
+ "__samza_coordinator_%s_%s" format (jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))
+ }
+
+ /**
+ * Get a job's name and ID given a config. Job ID is defaulted to 1 if not
+ * defined in the config, and job name must be defined in config.
+ *
+ * @return A tuple of (jobName, jobId)
+ */
+ def getJobNameAndId(config: Config) = {
+ (config.getName.getOrElse(throw new ConfigException("Missing required config: job.name")), config.getJobId.getOrElse("1"))
+ }
+
+ /**
+ * Given a job's full config object, build a subset config which includes
+ * only the job name, job id, and system config for the coordinator stream.
+ */
+ def buildCoordinatorStreamConfig(config: Config) = {
+ val (jobName, jobId) = getJobNameAndId(config)
+ // Build a map with just the system config and job.name/job.id. This is what's required to start the JobCoordinator.
+ new MapConfig(config.subset(SystemConfig.SYSTEM_PREFIX format config.getCoordinatorSystemName, false) ++
+ Map[String, String](JobConfig.JOB_NAME -> jobName, JobConfig.JOB_ID -> jobId, JobConfig.JOB_COORDINATOR_SYSTEM -> config.getCoordinatorSystemName))
+ }
+
+ /**
+ * Get the Coordinator System and system factory from the configuration
+ * @param config
+ * @return
+ */
+ def getCoordinatorSystemStreamAndFactory(config: Config) = {
+ val systemName = config.getCoordinatorSystemName
+ val (jobName, jobId) = Util.getJobNameAndId(config)
+ val streamName = Util.getCoordinatorStreamName(jobName, jobId)
+ val coordinatorSystemStream = new SystemStream(systemName, streamName)
+ val systemFactoryClassName = config
+ .getSystemFactory(systemName)
+ .getOrElse(throw new SamzaException("Missing configuration: " + SystemConfig.SYSTEM_FACTORY format systemName))
+ val systemFactory = Util.getObj[SystemFactory](systemFactoryClassName)
+ (coordinatorSystemStream, systemFactory)
+ }
+
+ /**
+ * The helper function converts a SSP to a string
+ * @param ssp System stream partition
+ * @return The string representation of the SSP
+ */
+ def sspToString(ssp: SystemStreamPartition): String = {
+ ssp.getSystem() + "." + ssp.getStream() + "." + String.valueOf(ssp.getPartition().getPartitionId())
+ }
+
+ /**
+ * The method converts the string SSP back to a SSP
+ * @param ssp The string form of the SSP
+ * @return An SSP typed object
+ */
+ def stringToSsp(ssp: String): SystemStreamPartition = {
+ val idx = ssp.indexOf('.');
+ val lastIdx = ssp.lastIndexOf('.')
+ if (idx < 0 || lastIdx < 0) {
+ throw new IllegalArgumentException("System stream partition expected in format 'system.stream.partition")
+ }
+ new SystemStreamPartition(new SystemStream(ssp.substring(0, idx), ssp.substring(idx + 1, lastIdx)),
+ new Partition(Integer.parseInt(ssp.substring(lastIdx + 1))))
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
new file mode 100644
index 0000000..59782fe
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import org.apache.samza.Partition;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
+import org.apache.samza.util.Util;
+
+/**
+ * Helper for creating mock CoordinatorStreamConsumer and
+ * CoordinatorStreamConsumer. The CoordinatorStreamConsumer is meant to just
+ * forward all configs to JobCoordinator, which is useful for mocking in unit
+ * tests.
+ */
+public class MockCoordinatorStreamSystemFactory implements SystemFactory {
+
+ private static SystemConsumer mockConsumer = null;
+ private static boolean useCachedConsumer = false;
+ public static void enableMockConsumerCache() {
+ mockConsumer = null;
+ useCachedConsumer = true;
+ }
+
+ public static void disableMockConsumerCache() {
+ useCachedConsumer = false;
+ mockConsumer = null;
+ }
+
+ /**
+ * Returns a consumer that sends all configs to the coordinator stream.
+ * @param config Along with the configs, you can pass checkpoints and changelog stream messages into the stream.
+ * The expected pattern is cp:source:taskname -> ssp,offset for checkpoint (Use sspToString util)
+ * ch:source:taskname -> changelogPartition for changelog
+ * Everything else is processed as normal config
+ */
+ public SystemConsumer getConsumer(String systemName, Config config, MetricsRegistry registry) {
+
+ if(useCachedConsumer && mockConsumer != null) {
+ return mockConsumer;
+ }
+
+ String jobName = config.get("job.name");
+ String jobId = config.get("job.id");
+ if (jobName == null) {
+ throw new ConfigException("Must define job.name.");
+ }
+ if (jobId == null) {
+ jobId = "1";
+ }
+ String streamName = Util.getCoordinatorStreamName(jobName, jobId);
+ SystemStreamPartition systemStreamPartition = new SystemStreamPartition(systemName, streamName, new Partition(0));
+ mockConsumer = new MockCoordinatorStreamWrappedConsumer(systemStreamPartition, config);
+ return mockConsumer;
+ }
+
+ /**
+ * Returns a no-op producer.
+ */
+ public SystemProducer getProducer(String systemName, Config config, MetricsRegistry registry) {
+ // A do-nothing producer.
+ return new SystemProducer() {
+ public void start() {
+ }
+
+ public void stop() {
+ }
+
+ public void register(String source) {
+ }
+
+ public void send(String source, OutgoingMessageEnvelope envelope) {
+ }
+
+ public void flush(String source) {
+ }
+ };
+ }
+
+ /**
+ * Returns a single partition admin that pretends to create a coordinator
+ * stream.
+ */
+ public SystemAdmin getAdmin(String systemName, Config config) {
+ return new MockSystemAdmin();
+ }
+
+ public static final class MockSystemAdmin extends SinglePartitionWithoutOffsetsSystemAdmin implements SystemAdmin {
+ public void createCoordinatorStream(String streamName) {
+ // Do nothing.
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
new file mode 100644
index 0000000..00a2d59
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetConfig;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.BlockingEnvelopeMap;
+import org.apache.samza.util.Util;
+import org.codehaus.jackson.map.ObjectMapper;
+
+/**
+ * A mock SystemConsumer that pretends to be a coordinator stream. The mock will
+ * take all configs given to it, and put them into the coordinator stream's
+ * SystemStreamPartition. This is useful in cases where config needs to be
+ * quickly passed from a unit test into the JobCoordinator.
+ */
+public class MockCoordinatorStreamWrappedConsumer extends BlockingEnvelopeMap {
+ private final static ObjectMapper MAPPER = SamzaObjectMapper.getObjectMapper();
+ public final static String CHANGELOGPREFIX = "ch:";
+ public final static String CHECKPOINTPREFIX = "cp:";
+ public final CountDownLatch blockConsumerPoll = new CountDownLatch(1);
+ public boolean blockpollFlag = false;
+
+ private final SystemStreamPartition systemStreamPartition;
+ private final Config config;
+
+ public MockCoordinatorStreamWrappedConsumer(SystemStreamPartition systemStreamPartition, Config config) {
+ super();
+ this.config = config;
+ this.systemStreamPartition = systemStreamPartition;
+ }
+
+ public void start() {
+ convertConfigToCoordinatorMessage(config);
+ }
+
+ public void addMoreMessages(Config config) {
+ convertConfigToCoordinatorMessage(config);
+ }
+
+
+ private void convertConfigToCoordinatorMessage(Config config) {
+ try {
+ for (Map.Entry<String, String> configPair : config.entrySet()) {
+ byte[] keyBytes = null;
+ byte[] messgeBytes = null;
+ if(configPair.getKey().startsWith(CHECKPOINTPREFIX))
+ {
+ String[] checkpointInfo = configPair.getKey().split(":");
+ String[] sspOffsetPair = configPair.getValue().split(":");
+ HashMap<SystemStreamPartition, String> checkpointMap = new HashMap<SystemStreamPartition, String>();
+ checkpointMap.put(Util.stringToSsp(sspOffsetPair[0]), sspOffsetPair[1]);
+ Checkpoint cp = new Checkpoint(checkpointMap);
+ CoordinatorStreamMessage.SetCheckpoint setCheckpoint = new CoordinatorStreamMessage.SetCheckpoint(checkpointInfo[1], checkpointInfo[2], cp);
+ keyBytes = MAPPER.writeValueAsString(setCheckpoint.getKeyArray()).getBytes("UTF-8");
+ messgeBytes = MAPPER.writeValueAsString(setCheckpoint.getMessageMap()).getBytes("UTF-8");
+ }
+ else if (configPair.getKey().startsWith(CHANGELOGPREFIX)) {
+ String[] changelogInfo = configPair.getKey().split(":");
+ String changeLogPartition = configPair.getValue();
+ CoordinatorStreamMessage.SetChangelogMapping changelogMapping = new CoordinatorStreamMessage.SetChangelogMapping(changelogInfo[1], changelogInfo[2], Integer.parseInt(changeLogPartition));
+ keyBytes = MAPPER.writeValueAsString(changelogMapping.getKeyArray()).getBytes("UTF-8");
+ messgeBytes = MAPPER.writeValueAsString(changelogMapping.getMessageMap()).getBytes("UTF-8");
+ }
+ else {
+ SetConfig setConfig = new SetConfig("source", configPair.getKey(), configPair.getValue());
+ keyBytes = MAPPER.writeValueAsString(setConfig.getKeyArray()).getBytes("UTF-8");
+ messgeBytes = MAPPER.writeValueAsString(setConfig.getMessageMap()).getBytes("UTF-8");
+ }
+ // The ssp here is the coordinator ssp (which is always fixed) and not the task ssp.
+ put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, "", keyBytes, messgeBytes));
+ }
+ setIsAtHead(systemStreamPartition, true);
+ } catch (Exception e) {
+ throw new SamzaException(e);
+ }
+ }
+
+ @Override
+ public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
+ Set<SystemStreamPartition> systemStreamPartitions, long timeout)
+ throws InterruptedException {
+
+ if(blockpollFlag) {
+ blockConsumerPoll.await();
+ }
+
+ return super.poll(systemStreamPartitions, timeout);
+ }
+
+ public CountDownLatch blockPool()
+ {
+ blockpollFlag = true;
+ return blockConsumerPoll;
+ }
+
+
+ public void stop() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
new file mode 100644
index 0000000..15181bb
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamMessage.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import static org.junit.Assert.*;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.Delete;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetConfig;
+import org.junit.Test;
+
+public class TestCoordinatorStreamMessage {
+ @Test
+ public void testCoordinatorStreamMessage() {
+ CoordinatorStreamMessage message = new CoordinatorStreamMessage("source");
+ assertEquals("source", message.getSource());
+ assertEquals(CoordinatorStreamMessage.VERSION, message.getVersion());
+ assertNotNull(message.getUsername());
+ assertTrue(message.getTimestamp() > 0);
+ assertTrue(!message.isDelete());
+ CoordinatorStreamMessage secondMessage = new CoordinatorStreamMessage(message.getKeyArray(), message.getMessageMap());
+ assertEquals(secondMessage, message);
+ }
+
+ @Test
+ public void testCoordinatorStreamMessageIsDelete() {
+ CoordinatorStreamMessage message = new CoordinatorStreamMessage(new Object[] {}, null);
+ assertTrue(message.isDelete());
+ assertNull(message.getMessageMap());
+ }
+
+ @Test
+ public void testSetConfig() {
+ SetConfig setConfig = new SetConfig("source", "key", "value");
+ assertEquals(SetConfig.TYPE, setConfig.getType());
+ assertEquals("key", setConfig.getKey());
+ assertEquals("value", setConfig.getConfigValue());
+ assertFalse(setConfig.isDelete());
+ assertEquals(CoordinatorStreamMessage.VERSION, setConfig.getVersion());
+ }
+
+ @Test
+ public void testDelete() {
+ Delete delete = new Delete("source2", "key", "delete-type");
+ assertEquals("delete-type", delete.getType());
+ assertEquals("key", delete.getKey());
+ assertNull(delete.getMessageMap());
+ assertTrue(delete.isDelete());
+ assertEquals(CoordinatorStreamMessage.VERSION, delete.getVersion());
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
new file mode 100644
index 0000000..5e193f8
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.Delete;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetConfig;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
+import org.junit.Test;
+
+public class TestCoordinatorStreamSystemConsumer {
+ @Test
+ public void testCoordinatorStreamSystemConsumer() {
+ Map<String, String> expectedConfig = new HashMap<String, String>();
+ expectedConfig.put("job.id", "1234");
+ SystemStream systemStream = new SystemStream("system", "stream");
+ MockSystemConsumer systemConsumer = new MockSystemConsumer(new SystemStreamPartition(systemStream, new Partition(0)));
+ CoordinatorStreamSystemConsumer consumer = new CoordinatorStreamSystemConsumer(systemStream, systemConsumer, new SinglePartitionWithoutOffsetsSystemAdmin());
+ assertFalse(systemConsumer.isRegistered());
+ consumer.register();
+ assertTrue(systemConsumer.isRegistered());
+ assertFalse(systemConsumer.isStarted());
+ consumer.start();
+ assertTrue(systemConsumer.isStarted());
+ try {
+ consumer.getConfig();
+ fail("Should have failed when retrieving config before bootstrapping.");
+ } catch (SamzaException e) {
+ // Expected.
+ }
+ consumer.bootstrap();
+ assertEquals(expectedConfig, consumer.getConfig());
+ assertFalse(systemConsumer.isStopped());
+ consumer.stop();
+ assertTrue(systemConsumer.isStopped());
+ }
+
+ private static class MockSystemConsumer implements SystemConsumer {
+ private boolean started = false;
+ private boolean stopped = false;
+ private boolean registered = false;
+ private final SystemStreamPartition expectedSystemStreamPartition;
+ private int pollCount = 0;
+
+ public MockSystemConsumer(SystemStreamPartition expectedSystemStreamPartition) {
+ this.expectedSystemStreamPartition = expectedSystemStreamPartition;
+ }
+
+ public void start() {
+ started = true;
+ }
+
+ public void stop() {
+ stopped = true;
+ }
+
+ public void register(SystemStreamPartition systemStreamPartition, String offset) {
+ registered = true;
+ assertEquals(expectedSystemStreamPartition, systemStreamPartition);
+ }
+
+ public boolean isRegistered() {
+ return registered;
+ }
+
+ public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
+ Map<SystemStreamPartition, List<IncomingMessageEnvelope>> map = new HashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>();
+ assertEquals(1, systemStreamPartitions.size());
+ SystemStreamPartition systemStreamPartition = systemStreamPartitions.iterator().next();
+ assertEquals(expectedSystemStreamPartition, systemStreamPartition);
+
+ if (pollCount++ == 0) {
+ List<IncomingMessageEnvelope> list = new ArrayList<IncomingMessageEnvelope>();
+ SetConfig setConfig1 = new SetConfig("test", "job.name", "my-job-name");
+ SetConfig setConfig2 = new SetConfig("test", "job.id", "1234");
+ Delete delete = new Delete("test", "job.name", SetConfig.TYPE);
+ list.add(new IncomingMessageEnvelope(systemStreamPartition, null, serialize(setConfig1.getKeyArray()), serialize(setConfig1.getMessageMap())));
+ list.add(new IncomingMessageEnvelope(systemStreamPartition, null, serialize(setConfig2.getKeyArray()), serialize(setConfig2.getMessageMap())));
+ list.add(new IncomingMessageEnvelope(systemStreamPartition, null, serialize(delete.getKeyArray()), delete.getMessageMap()));
+ map.put(systemStreamPartition, list);
+ }
+
+ return map;
+ }
+
+ private byte[] serialize(Object obj) {
+ try {
+ return SamzaObjectMapper.getObjectMapper().writeValueAsString(obj).getBytes("UTF-8");
+ } catch (Exception e) {
+ throw new SamzaException(e);
+ }
+ }
+
+ public boolean isStarted() {
+ return started;
+ }
+
+ public boolean isStopped() {
+ return stopped;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
new file mode 100644
index 0000000..728fa53
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemProducer.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.stream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.Delete;
+import org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetConfig;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemProducer;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin;
+import org.codehaus.jackson.type.TypeReference;
+import org.junit.Test;
+
+public class TestCoordinatorStreamSystemProducer {
+ @Test
+ public void testCoordinatorStreamSystemProducer() {
+ String source = "source";
+ SystemStream systemStream = new SystemStream("system", "stream");
+ MockSystemProducer systemProducer = new MockSystemProducer(source);
+ MockSystemAdmin systemAdmin = new MockSystemAdmin();
+ CoordinatorStreamSystemProducer producer = new CoordinatorStreamSystemProducer(systemStream, systemProducer, systemAdmin);
+ SetConfig setConfig1 = new SetConfig(source, "job.name", "my-job-name");
+ SetConfig setConfig2 = new SetConfig(source, "job.id", "1234");
+ Delete delete = new Delete(source, "job.name", SetConfig.TYPE);
+ assertFalse(systemProducer.isRegistered());
+ producer.register(source);
+ assertTrue(systemProducer.isRegistered());
+ assertFalse(systemProducer.isStarted());
+ producer.start();
+ assertTrue(systemProducer.isStarted());
+ producer.send(setConfig1);
+ producer.send(setConfig2);
+ producer.send(delete);
+ assertFalse(systemProducer.isStopped());
+ producer.stop();
+ assertTrue(systemProducer.isStopped());
+ List<OutgoingMessageEnvelope> envelopes = systemProducer.getEnvelopes();
+ OutgoingMessageEnvelope envelope0 = envelopes.get(0);
+ OutgoingMessageEnvelope envelope1 = envelopes.get(1);
+ OutgoingMessageEnvelope envelope2 = envelopes.get(2);
+ TypeReference<Object[]> keyRef = new TypeReference<Object[]>() {
+ };
+ TypeReference<Map<String, Object>> msgRef = new TypeReference<Map<String, Object>>() {
+ };
+ assertEquals(3, envelopes.size());
+ assertEquals(new CoordinatorStreamMessage(setConfig1), new CoordinatorStreamMessage(deserialize((byte[]) envelope0.getKey(), keyRef), deserialize((byte[]) envelope0.getMessage(), msgRef)));
+ assertEquals(new CoordinatorStreamMessage(setConfig2), new CoordinatorStreamMessage(deserialize((byte[]) envelope1.getKey(), keyRef), deserialize((byte[]) envelope1.getMessage(), msgRef)));
+ assertEquals(new CoordinatorStreamMessage(delete), new CoordinatorStreamMessage(deserialize((byte[]) envelope2.getKey(), keyRef), deserialize((byte[]) envelope2.getMessage(), msgRef)));
+ }
+
+ private <T> T deserialize(byte[] bytes, TypeReference<T> reference) {
+ try {
+ if (bytes != null) {
+ String valueStr = new String((byte[]) bytes, "UTF-8");
+ return SamzaObjectMapper.getObjectMapper().readValue(valueStr, reference);
+ }
+ } catch (Exception e) {
+ throw new SamzaException(e);
+ }
+
+ return null;
+ }
+
+ private static class MockSystemAdmin extends SinglePartitionWithoutOffsetsSystemAdmin implements SystemAdmin {
+ public void createCoordinatorStream(String streamName) {
+ // Do nothing.
+ }
+ }
+
+ private static class MockSystemProducer implements SystemProducer {
+ private final String expectedSource;
+ private final List<OutgoingMessageEnvelope> envelopes;
+ private boolean started = false;
+ private boolean stopped = false;
+ private boolean registered = false;
+ private boolean flushed = false;
+
+ public MockSystemProducer(String expectedSource) {
+ this.expectedSource = expectedSource;
+ this.envelopes = new ArrayList<OutgoingMessageEnvelope>();
+ }
+
+ public void start() {
+ started = true;
+ }
+
+ public void stop() {
+ stopped = true;
+ }
+
+ public void register(String source) {
+ assertEquals(expectedSource, source);
+ registered = true;
+ }
+
+ public void send(String source, OutgoingMessageEnvelope envelope) {
+ envelopes.add(envelope);
+ }
+
+ public void flush(String source) {
+ assertEquals(expectedSource, source);
+ flushed = true;
+ }
+
+ public List<OutgoingMessageEnvelope> getEnvelopes() {
+ return envelopes;
+ }
+
+ public boolean isStarted() {
+ return started;
+ }
+
+ public boolean isStopped() {
+ return stopped;
+ }
+
+ public boolean isRegistered() {
+ return registered;
+ }
+
+ public boolean isFlushed() {
+ return flushed;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
index 76bc681..72b134c 100644
--- a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
+++ b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
@@ -40,12 +40,12 @@ public class TestSamzaObjectMapper {
public void testJsonTaskModel() throws Exception {
ObjectMapper mapper = SamzaObjectMapper.getObjectMapper();
Map<String, String> configMap = new HashMap<String, String>();
+ Map<SystemStreamPartition, String> sspOffset = new HashMap<SystemStreamPartition, String>();
configMap.put("a", "b");
Config config = new MapConfig(configMap);
- Set<SystemStreamPartition> inputSystemStreamPartitions = new HashSet<SystemStreamPartition>();
- inputSystemStreamPartitions.add(new SystemStreamPartition("foo", "bar", new Partition(1)));
TaskName taskName = new TaskName("test");
- TaskModel taskModel = new TaskModel(taskName, inputSystemStreamPartitions, new Partition(2));
+ sspOffset.put(new SystemStreamPartition("foo", "bar", new Partition(1)), "");
+ TaskModel taskModel = new TaskModel(taskName, sspOffset, new Partition(2));
Map<TaskName, TaskModel> tasks = new HashMap<TaskName, TaskModel>();
tasks.put(taskName, taskModel);
ContainerModel containerModel = new ContainerModel(1, tasks);
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/resources/test.properties
----------------------------------------------------------------------
diff --git a/samza-core/src/test/resources/test.properties b/samza-core/src/test/resources/test.properties
index 9348c7d..41eb82e 100644
--- a/samza-core/src/test/resources/test.properties
+++ b/samza-core/src/test/resources/test.properties
@@ -22,3 +22,4 @@
job.factory.class=org.apache.samza.job.MockJobFactory
job.name=test-job
foo=bar
+systems.coordinator.samza.factory=org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
index af800df..0ba932c 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestCheckpointTool.scala
@@ -31,8 +31,9 @@ import org.mockito.Matchers._
import org.mockito.Mockito._
import org.scalatest.junit.AssertionsForJUnit
import org.scalatest.mock.MockitoSugar
-
import scala.collection.JavaConversions._
+import org.apache.samza.config.JobConfig
+import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
object TestCheckpointTool {
var checkpointManager: CheckpointManager = null
@@ -40,8 +41,8 @@ object TestCheckpointTool {
var systemProducer: SystemProducer = null
var systemAdmin: SystemAdmin = null
- class MockCheckpointManagerFactory extends CheckpointManagerFactory {
- override def getCheckpointManager(config: Config, registry: MetricsRegistry) = checkpointManager
+ class MockCheckpointManagerFactory {
+ def getCheckpointManager(config: Config, registry: MetricsRegistry) = checkpointManager
}
class MockSystemFactory extends SystemFactory {
@@ -62,15 +63,17 @@ class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar {
@Before
def setup {
config = new MapConfig(Map(
+ JobConfig.JOB_NAME -> "test",
+ JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
TaskConfig.INPUT_STREAMS -> "test.foo",
TaskConfig.CHECKPOINT_MANAGER_FACTORY -> classOf[MockCheckpointManagerFactory].getName,
- SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getName
+ SystemConfig.SYSTEM_FACTORY.format("test") -> classOf[MockSystemFactory].getName,
+ SystemConfig.SYSTEM_FACTORY.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName
))
val metadata = new SystemStreamMetadata("foo", Map[Partition, SystemStreamPartitionMetadata](
new Partition(0) -> new SystemStreamPartitionMetadata("0", "100", "101"),
new Partition(1) -> new SystemStreamPartitionMetadata("0", "200", "201")
))
-
TestCheckpointTool.checkpointManager = mock[CheckpointManager]
TestCheckpointTool.systemAdmin = mock[SystemAdmin]
when(TestCheckpointTool.systemAdmin.getSystemStreamMetadata(Set("foo")))
@@ -79,12 +82,12 @@ class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar {
.thenReturn(new Checkpoint(Map(new SystemStreamPartition("test", "foo", p0) -> "1234")))
when(TestCheckpointTool.checkpointManager.readLastCheckpoint(tn1))
.thenReturn(new Checkpoint(Map(new SystemStreamPartition("test", "foo", p1) -> "4321")))
-
}
@Test
def testReadLatestCheckpoint {
- new CheckpointTool(config, null).run
+ val checkpointTool = new CheckpointTool(config, null, TestCheckpointTool.checkpointManager)
+ checkpointTool.run
verify(TestCheckpointTool.checkpointManager).readLastCheckpoint(tn0)
verify(TestCheckpointTool.checkpointManager).readLastCheckpoint(tn1)
verify(TestCheckpointTool.checkpointManager, never()).writeCheckpoint(any(), any())
@@ -95,7 +98,8 @@ class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar {
val toOverwrite = Map(tn0 -> Map(new SystemStreamPartition("test", "foo", p0) -> "42"),
tn1 -> Map(new SystemStreamPartition("test", "foo", p1) -> "43"))
- new CheckpointTool(config, toOverwrite).run
+ val checkpointTool = new CheckpointTool(config, toOverwrite, TestCheckpointTool.checkpointManager)
+ checkpointTool.run
verify(TestCheckpointTool.checkpointManager)
.writeCheckpoint(tn0, new Checkpoint(Map(new SystemStreamPartition("test", "foo", p0) -> "42")))
verify(TestCheckpointTool.checkpointManager)
http://git-wip-us.apache.org/repos/asf/samza/blob/23fb2e1c/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
index a281e79..8d54c46 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -65,7 +65,7 @@ class TestOffsetManager {
val config = new MapConfig
val checkpointManager = getCheckpointManager(systemStreamPartition, taskName)
val systemAdmins = Map("test-system" -> getSystemAdmin)
- val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins)
+ val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins, new OffsetManagerMetrics, checkpointManager.getOffets)
offsetManager.register(taskName, Set(systemStreamPartition))
offsetManager.start
assertTrue(checkpointManager.isStarted)
@@ -97,7 +97,7 @@ class TestOffsetManager {
val config = new MapConfig
val checkpointManager = getCheckpointManager(systemStreamPartition, taskName)
val systemAdmins = Map("test-system" -> getSystemAdmin)
- val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins)
+ val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins, new OffsetManagerMetrics, checkpointManager.getOffets)
offsetManager.register(taskName, Set(systemStreamPartition))
offsetManager.start
// Should get offset 45 back from the checkpoint manager, which is last processed, and system admin should return 46 as starting offset.
@@ -179,22 +179,6 @@ class TestOffsetManager {
}
}
- @Ignore("OffsetManager.start is supposed to throw an exception - but it doesn't") @Test
- def testShouldFailWhenMissingDefault {
- val taskName = new TaskName("c")
- val systemStream = new SystemStream("test-system", "test-stream")
- val partition = new Partition(0)
- val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
- val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
- val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
- val offsetManager = OffsetManager(systemStreamMetadata, new MapConfig(Map[String, String]()))
- offsetManager.register(taskName, Set(systemStreamPartition))
-
- intercept[SamzaException] {
- offsetManager.start
- }
- }
-
@Test
def testDefaultSystemShouldFailWhenFailIsSpecified {
val systemStream = new SystemStream("test-system", "test-stream")
@@ -255,24 +239,26 @@ class TestOffsetManager {
assertEquals(Some("13"), offsetManager.getStartingOffset(ssp))
}
+
private def getCheckpointManager(systemStreamPartition: SystemStreamPartition, taskName:TaskName = new TaskName("taskName")) = {
val checkpoint = new Checkpoint(Map(systemStreamPartition -> "45"))
-
- new CheckpointManager {
+ new CheckpointManager(null, null, null) {
var isStarted = false
var isStopped = false
var registered = Set[TaskName]()
var checkpoints: Map[TaskName, Checkpoint] = Map(taskName -> checkpoint)
var taskNameToPartitionMapping: util.Map[TaskName, java.lang.Integer] = new util.HashMap[TaskName, java.lang.Integer]()
- def start { isStarted = true }
- def register(taskName: TaskName) { registered += taskName }
- def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) { checkpoints += taskName -> checkpoint }
- def readLastCheckpoint(taskName: TaskName) = checkpoints.getOrElse(taskName, null)
- def stop { isStopped = true }
-
- override def writeChangeLogPartitionMapping(mapping: util.Map[TaskName, java.lang.Integer]): Unit = taskNameToPartitionMapping = mapping
+ override def start { isStarted = true }
+ override def register(taskName: TaskName) { registered += taskName }
+ override def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) { checkpoints += taskName -> checkpoint }
+ override def readLastCheckpoint(taskName: TaskName) = checkpoints.getOrElse(taskName, null)
+ override def stop { isStopped = true }
- override def readChangeLogPartitionMapping(): util.Map[TaskName, java.lang.Integer] = taskNameToPartitionMapping
+ // Only for testing purposes - not present in actual checkpoint manager
+ def getOffets: util.Map[SystemStreamPartition, String] =
+ {
+ checkpoint.getOffsets()
+ }
}
}
@@ -284,8 +270,12 @@ class TestOffsetManager {
def getSystemStreamMetadata(streamNames: java.util.Set[String]) =
Map[String, SystemStreamMetadata]()
- override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) = {
- new SamzaException("Method not implemented")
+ override def createChangelogStream(topicName: String, numOfChangeLogPartitions: Int) {
+ new UnsupportedOperationException("Method not implemented.")
+ }
+
+ override def createCoordinatorStream(streamName: String) {
+ new UnsupportedOperationException("Method not implemented.")
}
}
}