You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Xinyu Liu (JIRA)" <ji...@apache.org> on 2018/01/04 20:34:01 UTC

[jira] [Updated] (SAMZA-1363) Create valid offset file when restoring state store.


     [ https://issues.apache.org/jira/browse/SAMZA-1363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Xinyu Liu updated SAMZA-1363:
-----------------------------
    Fix Version/s:     (was: 0.14.0)
                   0.15.0

> Create valid offset file when restoring state store.

> -----------------------------------------------------
>
>                 Key: SAMZA-1363
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1363
>             Project: Samza
>          Issue Type: Improvement
>            Reporter: Shanthoosh Venkataraman
>            Assignee: Shanthoosh Venkataraman
>             Fix For: 0.15.0
>
>
> TaskCoordinator.commit (or taskCoordinator.shutdown) creates offset file for local store in samza.

> StateStoreGC (SAMZA-1158) feature is introduced in samza-0.12(which periodically deletes stale local store).
> We found a race condition between StateStoreGC and samza container (in startup phase) in LinkedIn. 
> Here’s an investigation detailing the events(All these events happen in the host: hostname_masked_1.linkedin.com)
> LocalStoreMonitor.log: 
> {code:java}
> 2017-07-10 17:47:40 [pool-1-thread-1] LocalStoreMonitor [INFO] Job: jobName:waterloo-databus-isb-adapter jobId:i002 has the running status: stopped with preferred host: hostname_masked_2.linkedin.com.
> 2017-07-10 17:47:40 [pool-1-thread-1] LocalStoreMonitor [INFO] Store isb-state-store not used by the task: Partition 231.
> 2017-07-10 17:47:40 [pool-1-thread-1] LocalStoreMonitor [INFO] Deleting the task store: /export/content/data/samsa-yarn/samza-logged-stores/waterloo-databus-isb-adapter-i002/isb-state-store/Partition_231, since it has no offset file.
> {code}
> StateStoreGC observes three things: 
> * Job is stopped currently. 
> * Preferred host for the task is not localhost.
> * There’s no offset file associated with the local store(Local store with no offset file is stale and available for deletion).
> However, at the same time, job samza-job-1 gets scheduled to run on hostname_masked_1.linkedin.com .
> SamzaContainer.log (Container belongs to samza job: waterloo-databus-isb-adapter-i002)
> {code:java}
> 2017-07-10 17:47:31 BrokerProxy [INFO] Creating new SimpleConsumer for host lva1-app17981.prod.linkedin.com:10251 for system samzametadatasystem
> 2017-07-10 17:47:31 GetOffset [INFO] Validating offset 0 for topic and partition [samza-job-1-002-dedup-state-store,231]
> 2017-07-10 17:47:31 GetOffset [INFO] Able to successfully read from offset 0 for topic and partition [samza-job-1-002-dedup-state-store,231]. Using it to instantiate consumer.
> 2017-07-10 17:47:31 BrokerProxy [INFO] Starting BrokerProxy for hostname_masked.linkedin.com:10251
> 2017-07-10 17:47:39 KeyValueStorageEngine [INFO] 1000000 entries restored...
> 2017-07-10 17:47:40 SamzaContainer [ERROR] Caught exception/error in process loop.
> org.rocksdb.RocksDBException: 
>     at org.rocksdb.RocksDB.delete(Native Method)
>     at org.rocksdb.RocksDB.delete(RocksDB.java:1142)
>     at org.apache.samza.storage.kv.RocksDbKeyValueStore.putAll(RocksDbKeyValueStore.scala:161)
>     at org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore$1.apply(KeyValueStorageEngine.scala:111)
>     at org.apache.samza.storage.kv.KeyValueStorageEngine$$anonfun$restore$1.apply(KeyValueStorageEngine.scala:104)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>     at org.apache.samza.storage.kv.KeyValueStorageEngine.restore(KeyValueStorageEngine.scala:104)
>     at org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores$3.apply(TaskStorageManager.scala:263)
>     at org.apache.samza.storage.TaskStorageManager$$anonfun$restoreStores$3.apply(TaskStorageManager.scala:257)
>     at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>     at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
>     at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>     at org.apache.samza.storage.TaskStorageManager.restoreStores(TaskStorageManager.scala:257)
>     at org.apache.samza.storage.TaskStorageManager.init(TaskStorageManager.scala:88)
>     at org.apache.samza.container.TaskInstance.startStores(TaskInstance.scala:107)
>     at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:846)
>     at org.apache.samza.container.SamzaContainer$$anonfun$startStores$2.apply(SamzaContainer.scala:844)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> 

> {code}
> When StateStoreGC proceeds to cleanup the store, the global state has moved on.
> Since all three cleanup conditions were met, StateStoreGC deletes local store thereby failing the samza container.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)