You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@samza.apache.org by Malcolm McFarland <mm...@cavulus.com> on 2022/08/30 00:52:44 UTC
Running v1.7.0 locally
Hey folks,
I've recently been attempting to upgrade our legacy application from Samza
1.5.1 to 1.7.0. With version 1.5.1, I've had no problems running the
application with this command:
./bin/run-app.sh --config-path=path/to/file.properties
Starting in 1.6.0, this doesn't seem to work. As far as I can tell, the
application is starting fully up without errors and then is simply shutting
down, once again without error. Afaict it runs fine on YARN. Does Samza
v1.6.0+ support running local processes? I've tried this on both OS X and
Ubuntu, using Java 1.8.
Here are the relevant portions of the properties file:
task.class=com.cavulus.task.SimpleLegacyTask
job.factory.class=org.apache.samza.job.local.ThreadJobFactory
job.default.system=kafka
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
job.name=simple-legacy-task
task.inputs=kafka.event-input
...plus serdes, ZooKeeper configuration, etc, etc. Here are the last few
lines of logging output:
2022-08-29 17:19:42,842 DEBUG [org.apache.kafka.clients.NetworkClient]
[Consumer clientId=kafka_admin_consumer-simple_legacy_task-1,
groupId=simple-legacy-task-1] Sending metadata request
(type=MetadataRequest, topics=) to node localhost:9092 (id: -1 rack: null)
2022-08-29 17:19:42,843 INFO [org.apache.kafka.clients.Metadata]
Cluster ID: fwnjhL2kQayFxN0xpatT-g
2022-08-29 17:19:42,843 DEBUG [org.apache.kafka.clients.Metadata]
Updated cluster metadata version 2 to Cluster(id = fwnjhL2kQayFxN0xpatT-g,
nodes = [localhost:9092 (id: 0 rack: null)], partitions = [], controller =
localhost:9092 (id: 0 rack: null))
2022-08-29 17:19:42,843 DEBUG
[org.apache.samza.system.kafka.KafkaSystemAdmin] Stream
simple-legacy-task-broadcast-stream has partitions [Partition(topic =
simple-legacy-task-broadcast-stream, partition = 0, leader = 0, replicas =
[0], isr = [0], offlineReplicas = [])]
2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.clients.NetworkClient]
[Consumer clientId=kafka_admin_consumer-simple_legacy_task-1,
groupId=simple-legacy-task-1] Initiating connection to node localhost:9092
(id: 0 rack: null)
2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.common.metrics.Metrics]
Added sensor with name node-0.bytes-sent
2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.common.metrics.Metrics]
Added sensor with name node-0.bytes-received
2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.common.metrics.Metrics]
Added sensor with name node-0.latency
2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.common.network.Selector]
[Consumer clientId=kafka_admin_consumer-simple_legacy_task-1,
groupId=simple-legacy-task-1] Created socket with SO_RCVBUF = 342972,
SO_SNDBUF = 146988, SO_TIMEOUT = 0 to node 0
2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.clients.NetworkClient]
[Consumer clientId=kafka_admin_consumer-simple_legacy_task-1,
groupId=simple-legacy-task-1] Completed connection to node 0. Fetching API
versions.
2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.clients.NetworkClient]
[Consumer clientId=kafka_admin_consumer-simple_legacy_task-1,
groupId=simple-legacy-task-1] Initiating API versions fetch from node 0.
2022-08-29 17:19:42,845 DEBUG [org.apache.kafka.clients.NetworkClient]
[Consumer clientId=kafka_admin_consumer-simple_legacy_task-1,
groupId=simple-legacy-task-1] Recorded API versions for node 0:
(Produce(0): 0 to 7 [usable: 6], Fetch(1): 0 to 11 [usable: 8],
ListOffsets(2): 0 to 5 [usable: 3], Metadata(3): 0 to 8 [usable: 6],
LeaderAndIsr(4): 0 to 2 [usable: 1], StopReplica(5): 0 to 1 [usable: 0],
UpdateMetadata(6): 0 to 5 [usable: 4], ControlledShutdown(7): 0 to 2
[usable: 1], OffsetCommit(8): 0 to 7 [usable: 4], OffsetFetch(9): 0 to 5
[usable: 4], FindCoordinator(10): 0 to 2 [usable: 2], JoinGroup(11): 0 to 5
[usable: 3], Heartbeat(12): 0 to 3 [usable: 2], LeaveGroup(13): 0 to 2
[usable: 2], SyncGroup(14): 0 to 3 [usable: 2], DescribeGroups(15): 0 to 3
[usable: 2], ListGroups(16): 0 to 2 [usable: 2], SaslHandshake(17): 0 to 1
[usable: 1], ApiVersions(18): 0 to 2 [usable: 2], CreateTopics(19): 0 to 3
[usable: 3], DeleteTopics(20): 0 to 3 [usable: 2], DeleteRecords(21): 0 to
1 [usable: 1], InitProducerId(22): 0 to 1 [usable: 1],
OffsetForLeaderEpoch(23): 0 to 3 [usable: 1], AddPartitionsToTxn(24): 0 to
1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1
[usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to
2 [usable: 1], DescribeAcls(29): 0 to 1 [usable: 1], CreateAcls(30): 0 to 1
[usable: 1], DeleteAcls(31): 0 to 1 [usable: 1], DescribeConfigs(32): 0 to
2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1],
AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1
[usable: 1], SaslAuthenticate(36): 0 to 1 [usable: 0],
CreatePartitions(37): 0 to 1 [usable: 1], CreateDelegationToken(38): 0 to 1
[usable: 1], RenewDelegationToken(39): 0 to 1 [usable: 1],
ExpireDelegationToken(40): 0 to 1 [usable: 1], DescribeDelegationToken(41):
0 to 1 [usable: 1], DeleteGroups(42): 0 to 1 [usable: 1], UNKNOWN(43): 0,
UNKNOWN(44): 0)
2022-08-29 17:19:42,846 DEBUG
[org.apache.samza.system.kafka.KafkaSystemAdmin] Stream event-input has
partitions [Partition(topic = event-input, partition = 0, leader = 0,
replicas = [0], isr = [0], offlineReplicas = [])]
2022-08-29 17:19:42,846 INFO
[org.apache.samza.system.kafka.KafkaSystemAdmin] SystemStream partition
counts for system kafka: {event-input=SystemStreamMetadata
[streamName=event-input, partitionMetadata={Partition
[partition=0]=SystemStreamPartitionMetadata [oldestOffset=null,
newestOffset=null, upcomingOffset=null]}],
simple-legacy-task-broadcast-stream=SystemStreamMetadata
[streamName=simple-legacy-task-broadcast-stream,
partitionMetadata={Partition [partition=0]=SystemStreamPartitionMetadata
[oldestOffset=null, newestOffset=null, upcomingOffset=null]}]}
2022-08-29 17:19:42,850 DEBUG
[org.apache.samza.metrics.MetricsRegistryMap] Creating new gauge
job-coordinator kafka-event-input-partitionCount 0.
2022-08-29 17:19:42,850 DEBUG
[org.apache.samza.metrics.MetricsRegistryMap] Creating new gauge
job-coordinator kafka-simple-legacy-task-broadcast-stream-partitionCount 0.
2022-08-29 17:19:42,851 INFO
[org.apache.samza.zk.ScheduleAfterDebounceTime] Trying to cancel the
action: OnProcessorChange.
2022-08-29 17:19:42,852 INFO
[org.apache.samza.zk.ScheduleAfterDebounceTime] Scheduled action:
OnProcessorChange to run after: 20000 milliseconds.
2022-08-29 17:19:42,852 INFO [org.apache.samza.zk.ZkUtils] subscribing
for jm version change
at:/app-simple-legacy-task-1/simple-legacy-task-1-2.0-coordinationData/jobModelGeneration/jobModelVersion
2022-08-29 17:19:42,853 DEBUG [org.apache.zookeeper.ClientCnxn] Reading
reply sessionid:0x1000479a11c0066, packet:: clientPath:null serverPath:null
finished:false header:: 14,3 replyHeader:: 14,265013,0 request::
'/app-simple-legacy-task-1/simple-legacy-task-1-2.0-coordinationData/jobModelGeneration/jobModelVersion,T
response:: s{176584,263880,1630709112307,1661453291684,84,0,0,0,2,0,176584}
2022-08-29 17:19:42,853 DEBUG [org.I0Itec.zkclient.ZkClient] Subscribed
data changes for
/app-simple-legacy-task-1/simple-legacy-task-1-2.0-coordinationData/jobModelGeneration/jobModelVersion
At which point the application silently exits.
Thanks in advance for any advice, ideas, things to check, etc.
Cheers,
Malcolm McFarland
Cavulus
Re: Running v1.7.0 locally
Posted by Yi Pan <ni...@gmail.com>.
Hey, Malcolm,
Thanks for reporting this issue. Could you open a JIRA to track that?
Best!
-Yi
On Mon, Aug 29, 2022 at 5:53 PM Malcolm McFarland <mm...@cavulus.com>
wrote:
> Hey folks,
>
> I've recently been attempting to upgrade our legacy application from Samza
> 1.5.1 to 1.7.0. With version 1.5.1, I've had no problems running the
> application with this command:
>
> ./bin/run-app.sh --config-path=path/to/file.properties
>
> Starting in 1.6.0, this doesn't seem to work. As far as I can tell, the
> application is starting fully up without errors and then is simply shutting
> down, once again without error. Afaict it runs fine on YARN. Does Samza
> v1.6.0+ support running local processes? I've tried this on both OS X and
> Ubuntu, using Java 1.8.
>
> Here are the relevant portions of the properties file:
>
> task.class=com.cavulus.task.SimpleLegacyTask
> job.factory.class=org.apache.samza.job.local.ThreadJobFactory
> job.default.system=kafka
>
> systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
> job.name=simple-legacy-task
> task.inputs=kafka.event-input
>
> ...plus serdes, ZooKeeper configuration, etc, etc. Here are the last few
> lines of logging output:
>
> 2022-08-29 17:19:42,842 DEBUG [org.apache.kafka.clients.NetworkClient]
> [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1,
> groupId=simple-legacy-task-1] Sending metadata request
> (type=MetadataRequest, topics=) to node localhost:9092 (id: -1 rack: null)
> 2022-08-29 17:19:42,843 INFO [org.apache.kafka.clients.Metadata]
> Cluster ID: fwnjhL2kQayFxN0xpatT-g
> 2022-08-29 17:19:42,843 DEBUG [org.apache.kafka.clients.Metadata]
> Updated cluster metadata version 2 to Cluster(id = fwnjhL2kQayFxN0xpatT-g,
> nodes = [localhost:9092 (id: 0 rack: null)], partitions = [], controller =
> localhost:9092 (id: 0 rack: null))
> 2022-08-29 17:19:42,843 DEBUG
> [org.apache.samza.system.kafka.KafkaSystemAdmin] Stream
> simple-legacy-task-broadcast-stream has partitions [Partition(topic =
> simple-legacy-task-broadcast-stream, partition = 0, leader = 0, replicas =
> [0], isr = [0], offlineReplicas = [])]
> 2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.clients.NetworkClient]
> [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1,
> groupId=simple-legacy-task-1] Initiating connection to node localhost:9092
> (id: 0 rack: null)
> 2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.common.metrics.Metrics]
> Added sensor with name node-0.bytes-sent
> 2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.common.metrics.Metrics]
> Added sensor with name node-0.bytes-received
> 2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.common.metrics.Metrics]
> Added sensor with name node-0.latency
> 2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.common.network.Selector]
> [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1,
> groupId=simple-legacy-task-1] Created socket with SO_RCVBUF = 342972,
> SO_SNDBUF = 146988, SO_TIMEOUT = 0 to node 0
> 2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.clients.NetworkClient]
> [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1,
> groupId=simple-legacy-task-1] Completed connection to node 0. Fetching API
> versions.
> 2022-08-29 17:19:42,844 DEBUG [org.apache.kafka.clients.NetworkClient]
> [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1,
> groupId=simple-legacy-task-1] Initiating API versions fetch from node 0.
> 2022-08-29 17:19:42,845 DEBUG [org.apache.kafka.clients.NetworkClient]
> [Consumer clientId=kafka_admin_consumer-simple_legacy_task-1,
> groupId=simple-legacy-task-1] Recorded API versions for node 0:
> (Produce(0): 0 to 7 [usable: 6], Fetch(1): 0 to 11 [usable: 8],
> ListOffsets(2): 0 to 5 [usable: 3], Metadata(3): 0 to 8 [usable: 6],
> LeaderAndIsr(4): 0 to 2 [usable: 1], StopReplica(5): 0 to 1 [usable: 0],
> UpdateMetadata(6): 0 to 5 [usable: 4], ControlledShutdown(7): 0 to 2
> [usable: 1], OffsetCommit(8): 0 to 7 [usable: 4], OffsetFetch(9): 0 to 5
> [usable: 4], FindCoordinator(10): 0 to 2 [usable: 2], JoinGroup(11): 0 to 5
> [usable: 3], Heartbeat(12): 0 to 3 [usable: 2], LeaveGroup(13): 0 to 2
> [usable: 2], SyncGroup(14): 0 to 3 [usable: 2], DescribeGroups(15): 0 to 3
> [usable: 2], ListGroups(16): 0 to 2 [usable: 2], SaslHandshake(17): 0 to 1
> [usable: 1], ApiVersions(18): 0 to 2 [usable: 2], CreateTopics(19): 0 to 3
> [usable: 3], DeleteTopics(20): 0 to 3 [usable: 2], DeleteRecords(21): 0 to
> 1 [usable: 1], InitProducerId(22): 0 to 1 [usable: 1],
> OffsetForLeaderEpoch(23): 0 to 3 [usable: 1], AddPartitionsToTxn(24): 0 to
> 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1
> [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to
> 2 [usable: 1], DescribeAcls(29): 0 to 1 [usable: 1], CreateAcls(30): 0 to 1
> [usable: 1], DeleteAcls(31): 0 to 1 [usable: 1], DescribeConfigs(32): 0 to
> 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1],
> AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1
> [usable: 1], SaslAuthenticate(36): 0 to 1 [usable: 0],
> CreatePartitions(37): 0 to 1 [usable: 1], CreateDelegationToken(38): 0 to 1
> [usable: 1], RenewDelegationToken(39): 0 to 1 [usable: 1],
> ExpireDelegationToken(40): 0 to 1 [usable: 1], DescribeDelegationToken(41):
> 0 to 1 [usable: 1], DeleteGroups(42): 0 to 1 [usable: 1], UNKNOWN(43): 0,
> UNKNOWN(44): 0)
> 2022-08-29 17:19:42,846 DEBUG
> [org.apache.samza.system.kafka.KafkaSystemAdmin] Stream event-input has
> partitions [Partition(topic = event-input, partition = 0, leader = 0,
> replicas = [0], isr = [0], offlineReplicas = [])]
> 2022-08-29 17:19:42,846 INFO
> [org.apache.samza.system.kafka.KafkaSystemAdmin] SystemStream partition
> counts for system kafka: {event-input=SystemStreamMetadata
> [streamName=event-input, partitionMetadata={Partition
> [partition=0]=SystemStreamPartitionMetadata [oldestOffset=null,
> newestOffset=null, upcomingOffset=null]}],
> simple-legacy-task-broadcast-stream=SystemStreamMetadata
> [streamName=simple-legacy-task-broadcast-stream,
> partitionMetadata={Partition [partition=0]=SystemStreamPartitionMetadata
> [oldestOffset=null, newestOffset=null, upcomingOffset=null]}]}
> 2022-08-29 17:19:42,850 DEBUG
> [org.apache.samza.metrics.MetricsRegistryMap] Creating new gauge
> job-coordinator kafka-event-input-partitionCount 0.
> 2022-08-29 17:19:42,850 DEBUG
> [org.apache.samza.metrics.MetricsRegistryMap] Creating new gauge
> job-coordinator kafka-simple-legacy-task-broadcast-stream-partitionCount 0.
> 2022-08-29 17:19:42,851 INFO
> [org.apache.samza.zk.ScheduleAfterDebounceTime] Trying to cancel the
> action: OnProcessorChange.
> 2022-08-29 17:19:42,852 INFO
> [org.apache.samza.zk.ScheduleAfterDebounceTime] Scheduled action:
> OnProcessorChange to run after: 20000 milliseconds.
> 2022-08-29 17:19:42,852 INFO [org.apache.samza.zk.ZkUtils] subscribing
> for jm version change
>
> at:/app-simple-legacy-task-1/simple-legacy-task-1-2.0-coordinationData/jobModelGeneration/jobModelVersion
> 2022-08-29 17:19:42,853 DEBUG [org.apache.zookeeper.ClientCnxn] Reading
> reply sessionid:0x1000479a11c0066, packet:: clientPath:null serverPath:null
> finished:false header:: 14,3 replyHeader:: 14,265013,0 request::
>
> '/app-simple-legacy-task-1/simple-legacy-task-1-2.0-coordinationData/jobModelGeneration/jobModelVersion,T
> response::
> s{176584,263880,1630709112307,1661453291684,84,0,0,0,2,0,176584}
> 2022-08-29 17:19:42,853 DEBUG [org.I0Itec.zkclient.ZkClient] Subscribed
> data changes for
>
> /app-simple-legacy-task-1/simple-legacy-task-1-2.0-coordinationData/jobModelGeneration/jobModelVersion
>
> At which point the application silently exits.
>
> Thanks in advance for any advice, ideas, things to check, etc.
>
> Cheers,
> Malcolm McFarland
> Cavulus
>