You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Malcolm McFarland <mm...@cavulus.com> on 2022/08/30 00:52:44 UTC

Running v1.7.0 locally

Hey folks,

I've recently been attempting to upgrade our legacy application from Samza
1.5.1 to 1.7.0. With version 1.5.1, I've had no problems running the
application with this command:

./bin/run-app.sh --config-path=path/to/file.properties

Starting in 1.6.0, this doesn't seem to work. As far as I can tell, the
application is starting fully up without errors and then is simply shutting
down, once again without error. Afaict it runs fine on YARN. Does Samza
v1.6.0+ support running local processes? I've tried this on both OS X and
Ubuntu, using Java 1.8.

Here are the relevant portions of the properties file:

task.class=com.cavulus.task.SimpleLegacyTask
job.factory.class=org.apache.samza.job.local.ThreadJobFactory
job.default.system=kafka
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
job.name=simple-legacy-task
task.inputs=kafka.event-input

...plus serdes, ZooKeeper configuration, etc, etc. Here are the last few
lines of logging output:

2022-08-29 17:19:42,842  DEBUG  [org.apache.kafka.clients.NetworkClient]
 [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1,
groupId=simple-legacy-task-1] Sending metadata request
(type=MetadataRequest, topics=) to node localhost:9092 (id: -1 rack: null)
2022-08-29 17:19:42,843  INFO   [org.apache.kafka.clients.Metadata]
 Cluster ID: fwnjhL2kQayFxN0xpatT-g
2022-08-29 17:19:42,843  DEBUG  [org.apache.kafka.clients.Metadata]
 Updated cluster metadata version 2 to Cluster(id = fwnjhL2kQayFxN0xpatT-g,
nodes = [localhost:9092 (id: 0 rack: null)], partitions = [], controller =
localhost:9092 (id: 0 rack: null))
2022-08-29 17:19:42,843  DEBUG
 [org.apache.samza.system.kafka.KafkaSystemAdmin]  Stream
simple-legacy-task-broadcast-stream has partitions [Partition(topic =
simple-legacy-task-broadcast-stream, partition = 0, leader = 0, replicas =
[0], isr = [0], offlineReplicas = [])]
2022-08-29 17:19:42,844  DEBUG  [org.apache.kafka.clients.NetworkClient]
 [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1,
groupId=simple-legacy-task-1] Initiating connection to node localhost:9092
(id: 0 rack: null)
2022-08-29 17:19:42,844  DEBUG  [org.apache.kafka.common.metrics.Metrics]
 Added sensor with name node-0.bytes-sent
2022-08-29 17:19:42,844  DEBUG  [org.apache.kafka.common.metrics.Metrics]
 Added sensor with name node-0.bytes-received
2022-08-29 17:19:42,844  DEBUG  [org.apache.kafka.common.metrics.Metrics]
 Added sensor with name node-0.latency
2022-08-29 17:19:42,844  DEBUG  [org.apache.kafka.common.network.Selector]
 [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1,
groupId=simple-legacy-task-1] Created socket with SO_RCVBUF = 342972,
SO_SNDBUF = 146988, SO_TIMEOUT = 0 to node 0
2022-08-29 17:19:42,844  DEBUG  [org.apache.kafka.clients.NetworkClient]
 [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1,
groupId=simple-legacy-task-1] Completed connection to node 0. Fetching API
versions.
2022-08-29 17:19:42,844  DEBUG  [org.apache.kafka.clients.NetworkClient]
 [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1,
groupId=simple-legacy-task-1] Initiating API versions fetch from node 0.
2022-08-29 17:19:42,845  DEBUG  [org.apache.kafka.clients.NetworkClient]
 [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1,
groupId=simple-legacy-task-1] Recorded API versions for node 0:
(Produce(0): 0 to 7 [usable: 6], Fetch(1): 0 to 11 [usable: 8],
ListOffsets(2): 0 to 5 [usable: 3], Metadata(3): 0 to 8 [usable: 6],
LeaderAndIsr(4): 0 to 2 [usable: 1], StopReplica(5): 0 to 1 [usable: 0],
UpdateMetadata(6): 0 to 5 [usable: 4], ControlledShutdown(7): 0 to 2
[usable: 1], OffsetCommit(8): 0 to 7 [usable: 4], OffsetFetch(9): 0 to 5
[usable: 4], FindCoordinator(10): 0 to 2 [usable: 2], JoinGroup(11): 0 to 5
[usable: 3], Heartbeat(12): 0 to 3 [usable: 2], LeaveGroup(13): 0 to 2
[usable: 2], SyncGroup(14): 0 to 3 [usable: 2], DescribeGroups(15): 0 to 3
[usable: 2], ListGroups(16): 0 to 2 [usable: 2], SaslHandshake(17): 0 to 1
[usable: 1], ApiVersions(18): 0 to 2 [usable: 2], CreateTopics(19): 0 to 3
[usable: 3], DeleteTopics(20): 0 to 3 [usable: 2], DeleteRecords(21): 0 to
1 [usable: 1], InitProducerId(22): 0 to 1 [usable: 1],
OffsetForLeaderEpoch(23): 0 to 3 [usable: 1], AddPartitionsToTxn(24): 0 to
1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1
[usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to
2 [usable: 1], DescribeAcls(29): 0 to 1 [usable: 1], CreateAcls(30): 0 to 1
[usable: 1], DeleteAcls(31): 0 to 1 [usable: 1], DescribeConfigs(32): 0 to
2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1],
AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1
[usable: 1], SaslAuthenticate(36): 0 to 1 [usable: 0],
CreatePartitions(37): 0 to 1 [usable: 1], CreateDelegationToken(38): 0 to 1
[usable: 1], RenewDelegationToken(39): 0 to 1 [usable: 1],
ExpireDelegationToken(40): 0 to 1 [usable: 1], DescribeDelegationToken(41):
0 to 1 [usable: 1], DeleteGroups(42): 0 to 1 [usable: 1], UNKNOWN(43): 0,
UNKNOWN(44): 0)
2022-08-29 17:19:42,846  DEBUG
 [org.apache.samza.system.kafka.KafkaSystemAdmin]  Stream event-input has
partitions [Partition(topic = event-input, partition = 0, leader = 0,
replicas = [0], isr = [0], offlineReplicas = [])]
2022-08-29 17:19:42,846  INFO
[org.apache.samza.system.kafka.KafkaSystemAdmin]  SystemStream partition
counts for system kafka: {event-input=SystemStreamMetadata
[streamName=event-input, partitionMetadata={Partition
[partition=0]=SystemStreamPartitionMetadata [oldestOffset=null,
newestOffset=null, upcomingOffset=null]}],
simple-legacy-task-broadcast-stream=SystemStreamMetadata
[streamName=simple-legacy-task-broadcast-stream,
partitionMetadata={Partition [partition=0]=SystemStreamPartitionMetadata
[oldestOffset=null, newestOffset=null, upcomingOffset=null]}]}
2022-08-29 17:19:42,850  DEBUG
 [org.apache.samza.metrics.MetricsRegistryMap]  Creating new gauge
job-coordinator kafka-event-input-partitionCount 0.
2022-08-29 17:19:42,850  DEBUG
 [org.apache.samza.metrics.MetricsRegistryMap]  Creating new gauge
job-coordinator kafka-simple-legacy-task-broadcast-stream-partitionCount 0.
2022-08-29 17:19:42,851  INFO
[org.apache.samza.zk.ScheduleAfterDebounceTime]  Trying to cancel the
action: OnProcessorChange.
2022-08-29 17:19:42,852  INFO
[org.apache.samza.zk.ScheduleAfterDebounceTime]  Scheduled action:
OnProcessorChange to run after: 20000 milliseconds.
2022-08-29 17:19:42,852  INFO   [org.apache.samza.zk.ZkUtils]   subscribing
for jm version change
at:/app-simple-legacy-task-1/simple-legacy-task-1-2.0-coordinationData/jobModelGeneration/jobModelVersion
2022-08-29 17:19:42,853  DEBUG  [org.apache.zookeeper.ClientCnxn]  Reading
reply sessionid:0x1000479a11c0066, packet:: clientPath:null serverPath:null
finished:false header:: 14,3  replyHeader:: 14,265013,0  request::
'/app-simple-legacy-task-1/simple-legacy-task-1-2.0-coordinationData/jobModelGeneration/jobModelVersion,T
 response:: s{176584,263880,1630709112307,1661453291684,84,0,0,0,2,0,176584}
2022-08-29 17:19:42,853  DEBUG  [org.I0Itec.zkclient.ZkClient]  Subscribed
data changes for
/app-simple-legacy-task-1/simple-legacy-task-1-2.0-coordinationData/jobModelGeneration/jobModelVersion

At which point the application silently exits.

Thanks in advance for any advice, ideas, things to check, etc.

Cheers,
Malcolm McFarland
Cavulus

Re: Running v1.7.0 locally

Posted by Yi Pan <ni...@gmail.com>.
Hey, Malcolm,

Thanks for reporting this issue. Could you open a JIRA to track that?

Best!

-Yi

On Mon, Aug 29, 2022 at 5:53 PM Malcolm McFarland <mm...@cavulus.com>
wrote:

> Hey folks,
>
> I've recently been attempting to upgrade our legacy application from Samza
> 1.5.1 to 1.7.0. With version 1.5.1, I've had no problems running the
> application with this command:
>
> ./bin/run-app.sh --config-path=path/to/file.properties
>
> Starting in 1.6.0, this doesn't seem to work. As far as I can tell, the
> application is starting fully up without errors and then is simply shutting
> down, once again without error. Afaict it runs fine on YARN. Does Samza
> v1.6.0+ support running local processes? I've tried this on both OS X and
> Ubuntu, using Java 1.8.
>
> Here are the relevant portions of the properties file:
>
> task.class=com.cavulus.task.SimpleLegacyTask
> job.factory.class=org.apache.samza.job.local.ThreadJobFactory
> job.default.system=kafka
>
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> job.name=simple-legacy-task
> task.inputs=kafka.event-input
>
> ...plus serdes, ZooKeeper configuration, etc, etc. Here are the last few
> lines of logging output:
>
> 2022-08-29 17:19:42,842  DEBUG  [org.apache.kafka.clients.NetworkClient]
>  [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1,
> groupId=simple-legacy-task-1] Sending metadata request
> (type=MetadataRequest, topics=) to node localhost:9092 (id: -1 rack: null)
> 2022-08-29 17:19:42,843  INFO   [org.apache.kafka.clients.Metadata]
>  Cluster ID: fwnjhL2kQayFxN0xpatT-g
> 2022-08-29 17:19:42,843  DEBUG  [org.apache.kafka.clients.Metadata]
>  Updated cluster metadata version 2 to Cluster(id = fwnjhL2kQayFxN0xpatT-g,
> nodes = [localhost:9092 (id: 0 rack: null)], partitions = [], controller =
> localhost:9092 (id: 0 rack: null))
> 2022-08-29 17:19:42,843  DEBUG
>  [org.apache.samza.system.kafka.KafkaSystemAdmin]  Stream
> simple-legacy-task-broadcast-stream has partitions [Partition(topic =
> simple-legacy-task-broadcast-stream, partition = 0, leader = 0, replicas =
> [0], isr = [0], offlineReplicas = [])]
> 2022-08-29 17:19:42,844  DEBUG  [org.apache.kafka.clients.NetworkClient]
>  [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1,
> groupId=simple-legacy-task-1] Initiating connection to node localhost:9092
> (id: 0 rack: null)
> 2022-08-29 17:19:42,844  DEBUG  [org.apache.kafka.common.metrics.Metrics]
>  Added sensor with name node-0.bytes-sent
> 2022-08-29 17:19:42,844  DEBUG  [org.apache.kafka.common.metrics.Metrics]
>  Added sensor with name node-0.bytes-received
> 2022-08-29 17:19:42,844  DEBUG  [org.apache.kafka.common.metrics.Metrics]
>  Added sensor with name node-0.latency
> 2022-08-29 17:19:42,844  DEBUG  [org.apache.kafka.common.network.Selector]
>  [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1,
> groupId=simple-legacy-task-1] Created socket with SO_RCVBUF = 342972,
> SO_SNDBUF = 146988, SO_TIMEOUT = 0 to node 0
> 2022-08-29 17:19:42,844  DEBUG  [org.apache.kafka.clients.NetworkClient]
>  [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1,
> groupId=simple-legacy-task-1] Completed connection to node 0. Fetching API
> versions.
> 2022-08-29 17:19:42,844  DEBUG  [org.apache.kafka.clients.NetworkClient]
>  [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1,
> groupId=simple-legacy-task-1] Initiating API versions fetch from node 0.
> 2022-08-29 17:19:42,845  DEBUG  [org.apache.kafka.clients.NetworkClient]
>  [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1,
> groupId=simple-legacy-task-1] Recorded API versions for node 0:
> (Produce(0): 0 to 7 [usable: 6], Fetch(1): 0 to 11 [usable: 8],
> ListOffsets(2): 0 to 5 [usable: 3], Metadata(3): 0 to 8 [usable: 6],
> LeaderAndIsr(4): 0 to 2 [usable: 1], StopReplica(5): 0 to 1 [usable: 0],
> UpdateMetadata(6): 0 to 5 [usable: 4], ControlledShutdown(7): 0 to 2
> [usable: 1], OffsetCommit(8): 0 to 7 [usable: 4], OffsetFetch(9): 0 to 5
> [usable: 4], FindCoordinator(10): 0 to 2 [usable: 2], JoinGroup(11): 0 to 5
> [usable: 3], Heartbeat(12): 0 to 3 [usable: 2], LeaveGroup(13): 0 to 2
> [usable: 2], SyncGroup(14): 0 to 3 [usable: 2], DescribeGroups(15): 0 to 3
> [usable: 2], ListGroups(16): 0 to 2 [usable: 2], SaslHandshake(17): 0 to 1
> [usable: 1], ApiVersions(18): 0 to 2 [usable: 2], CreateTopics(19): 0 to 3
> [usable: 3], DeleteTopics(20): 0 to 3 [usable: 2], DeleteRecords(21): 0 to
> 1 [usable: 1], InitProducerId(22): 0 to 1 [usable: 1],
> OffsetForLeaderEpoch(23): 0 to 3 [usable: 1], AddPartitionsToTxn(24): 0 to
> 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1
> [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to
> 2 [usable: 1], DescribeAcls(29): 0 to 1 [usable: 1], CreateAcls(30): 0 to 1
> [usable: 1], DeleteAcls(31): 0 to 1 [usable: 1], DescribeConfigs(32): 0 to
> 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1],
> AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1
> [usable: 1], SaslAuthenticate(36): 0 to 1 [usable: 0],
> CreatePartitions(37): 0 to 1 [usable: 1], CreateDelegationToken(38): 0 to 1
> [usable: 1], RenewDelegationToken(39): 0 to 1 [usable: 1],
> ExpireDelegationToken(40): 0 to 1 [usable: 1], DescribeDelegationToken(41):
> 0 to 1 [usable: 1], DeleteGroups(42): 0 to 1 [usable: 1], UNKNOWN(43): 0,
> UNKNOWN(44): 0)
> 2022-08-29 17:19:42,846  DEBUG
>  [org.apache.samza.system.kafka.KafkaSystemAdmin]  Stream event-input has
> partitions [Partition(topic = event-input, partition = 0, leader = 0,
> replicas = [0], isr = [0], offlineReplicas = [])]
> 2022-08-29 17:19:42,846  INFO
> [org.apache.samza.system.kafka.KafkaSystemAdmin]  SystemStream partition
> counts for system kafka: {event-input=SystemStreamMetadata
> [streamName=event-input, partitionMetadata={Partition
> [partition=0]=SystemStreamPartitionMetadata [oldestOffset=null,
> newestOffset=null, upcomingOffset=null]}],
> simple-legacy-task-broadcast-stream=SystemStreamMetadata
> [streamName=simple-legacy-task-broadcast-stream,
> partitionMetadata={Partition [partition=0]=SystemStreamPartitionMetadata
> [oldestOffset=null, newestOffset=null, upcomingOffset=null]}]}
> 2022-08-29 17:19:42,850  DEBUG
>  [org.apache.samza.metrics.MetricsRegistryMap]  Creating new gauge
> job-coordinator kafka-event-input-partitionCount 0.
> 2022-08-29 17:19:42,850  DEBUG
>  [org.apache.samza.metrics.MetricsRegistryMap]  Creating new gauge
> job-coordinator kafka-simple-legacy-task-broadcast-stream-partitionCount 0.
> 2022-08-29 17:19:42,851  INFO
> [org.apache.samza.zk.ScheduleAfterDebounceTime]  Trying to cancel the
> action: OnProcessorChange.
> 2022-08-29 17:19:42,852  INFO
> [org.apache.samza.zk.ScheduleAfterDebounceTime]  Scheduled action:
> OnProcessorChange to run after: 20000 milliseconds.
> 2022-08-29 17:19:42,852  INFO   [org.apache.samza.zk.ZkUtils]   subscribing
> for jm version change
>
> at:/app-simple-legacy-task-1/simple-legacy-task-1-2.0-coordinationData/jobModelGeneration/jobModelVersion
> 2022-08-29 17:19:42,853  DEBUG  [org.apache.zookeeper.ClientCnxn]  Reading
> reply sessionid:0x1000479a11c0066, packet:: clientPath:null serverPath:null
> finished:false header:: 14,3  replyHeader:: 14,265013,0  request::
>
> '/app-simple-legacy-task-1/simple-legacy-task-1-2.0-coordinationData/jobModelGeneration/jobModelVersion,T
>  response::
> s{176584,263880,1630709112307,1661453291684,84,0,0,0,2,0,176584}
> 2022-08-29 17:19:42,853  DEBUG  [org.I0Itec.zkclient.ZkClient]  Subscribed
> data changes for
>
> /app-simple-legacy-task-1/simple-legacy-task-1-2.0-coordinationData/jobModelGeneration/jobModelVersion
>
> At which point the application silently exits.
>
> Thanks in advance for any advice, ideas, things to check, etc.
>
> Cheers,
> Malcolm McFarland
> Cavulus
>