You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/08/18 21:38:55 UTC

[1/2] samza git commit: SAMZA-1378: Introduce and Implement Scheduler Interface for Polling in Azure

Repository: samza
Updated Branches:
  refs/heads/master 1d253c757 -> c3b447ecb


http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/JMVersionUpgradeScheduler.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/JMVersionUpgradeScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/JMVersionUpgradeScheduler.java
new file mode 100644
index 0000000..ded014f
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/JMVersionUpgradeScheduler.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.scheduler;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.samza.coordinator.data.BarrierState;
+import org.apache.samza.util.BlobUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Scheduler invoked by each processor to check for job model version upgrades on the blob.
+ * Checks every 5 seconds.
+ * The processor polls the leader blob in order to track this.
+ * All time units are in SECONDS.
+ */
+public class JMVersionUpgradeScheduler implements TaskScheduler {
+  private static final Logger LOG = LoggerFactory.getLogger(JMVersionUpgradeScheduler.class);
+  private static final long JMV_UPGRADE_DELAY_SEC = 5;
+  private static final ThreadFactory
+      PROCESSOR_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("JMVersionUpgradeScheduler-%d").build();
+  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(PROCESSOR_THREAD_FACTORY);
+  private final BlobUtils blob;
+  private final AtomicReference<String> currentJMVersion;
+  private final AtomicBoolean versionUpgradeDetected;
+  private final String processorId;
+  private final Consumer<String> errorHandler;
+  private SchedulerStateChangeListener listener = null;
+
+  public JMVersionUpgradeScheduler(Consumer<String> errorHandler, BlobUtils blob,
+      AtomicReference<String> currentJMVersion, AtomicBoolean versionUpgradeDetected, String processorId) {
+    this.blob = blob;
+    this.currentJMVersion = currentJMVersion;
+    this.versionUpgradeDetected = versionUpgradeDetected;
+    this.processorId = processorId;
+    this.errorHandler = errorHandler;
+  }
+
+  @Override
+  public ScheduledFuture scheduleTask() {
+    return scheduler.scheduleWithFixedDelay(() -> {
+        try {
+          LOG.info("Checking for job model version upgrade");
+          // Read job model version from the blob.
+          String blobJMV = blob.getJobModelVersion();
+          LOG.info("Job Model Version seen on the blob: {}", blobJMV);
+          String blobBarrierState = blob.getBarrierState();
+          String currentJMV = currentJMVersion.get();
+          LOG.info("Current Job Model Version that the job coordinator is working on: {}", currentJMV);
+          String expectedBarrierState = BarrierState.START.toString() + " " + blobJMV;
+          List<String> processorList = blob.getLiveProcessorList();
+          // Check if the job model version on the blob is consistent with the job model version that the processor is operating on.
+          if (processorList != null && processorList.contains(processorId) && !currentJMV.equals(blobJMV) && blobBarrierState.equals(expectedBarrierState) && !versionUpgradeDetected.get()) {
+            listener.onStateChange();
+          }
+        } catch (Exception e) {
+          errorHandler.accept("Exception in Job Model Version Upgrade Scheduler. Stopping the processor...");
+        }
+      }, JMV_UPGRADE_DELAY_SEC, JMV_UPGRADE_DELAY_SEC, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public void setStateChangeListener(SchedulerStateChangeListener listener) {
+    this.listener = listener;
+  }
+
+  @Override
+  public void shutdown() {
+    LOG.info("Shutting down JMVersionUpgradeScheduler Scheduler.");
+    scheduler.shutdownNow();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderBarrierCompleteScheduler.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderBarrierCompleteScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderBarrierCompleteScheduler.java
new file mode 100644
index 0000000..7386fa9
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderBarrierCompleteScheduler.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.scheduler;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.samza.coordinator.data.ProcessorEntity;
+import org.apache.samza.util.TableUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Scheduler class invoked by the leader to check if the barrier has completed.
+ * Checks every 15 seconds.
+ * The leader polls the Azure processor table in order to track this.
+ * The barrier is completed if all processors that are listed alive on the blob, have entries in the Azure table with the new job model version.
+ * All time units are in SECONDS.
+ */
+public class LeaderBarrierCompleteScheduler implements TaskScheduler {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LeaderBarrierCompleteScheduler.class);
+  private static final long BARRIER_REACHED_DELAY_SEC = 5;
+  private static final long BARRIER_TIMEOUT_SEC = 30;
+  private static final ThreadFactory
+      PROCESSOR_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("LeaderBarrierCompleteScheduler-%d").build();
+  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(PROCESSOR_THREAD_FACTORY);
+  private final TableUtils table;
+  private final String nextJMVersion;
+  private final Set<String> blobProcessorSet;
+  private final long startTime;
+  private final AtomicBoolean barrierTimeout;
+  private final Consumer<String> errorHandler;
+  private final String processorId;
+  private final AtomicReference<String> currentJMVersion;
+  private SchedulerStateChangeListener listener = null;
+
+  public LeaderBarrierCompleteScheduler(Consumer<String> errorHandler, TableUtils table, String nextJMVersion,
+      List<String> blobProcessorList, long startTime, AtomicBoolean barrierTimeout, AtomicReference<String> currentJMVersion, final String pid) {
+    this.table = table;
+    this.nextJMVersion = nextJMVersion;
+    this.blobProcessorSet = new HashSet<>(blobProcessorList);
+    this.startTime = startTime;
+    this.barrierTimeout = barrierTimeout;
+    this.errorHandler = errorHandler;
+    this.processorId = pid;
+    this.currentJMVersion = currentJMVersion;
+  }
+
+  @Override
+  public ScheduledFuture scheduleTask() {
+    return scheduler.scheduleWithFixedDelay(() -> {
+        try {
+          if (!table.getEntity(currentJMVersion.get(), processorId).getIsLeader()) {
+            LOG.info("Not the leader anymore. Shutting down LeaderBarrierCompleteScheduler.");
+            barrierTimeout.getAndSet(true);
+            listener.onStateChange();
+          } else {
+            LOG.info("Leader checking for barrier state");
+            // Get processor IDs listed in the table that have the new job model verion.
+            Iterable<ProcessorEntity> tableList = table.getEntitiesWithPartition(nextJMVersion);
+            Set<String> tableProcessors = new HashSet<>();
+            for (ProcessorEntity entity : tableList) {
+              tableProcessors.add(entity.getRowKey());
+            }
+            LOG.info("List of live processors as seen on the blob = {}", blobProcessorSet);
+            LOG.info("List of live processors as seen in the table = {}", tableProcessors);
+            if ((System.currentTimeMillis() - startTime) > (BARRIER_TIMEOUT_SEC * 1000)) {
+              barrierTimeout.getAndSet(true);
+              listener.onStateChange();
+            } else if (blobProcessorSet.equals(tableProcessors)) {
+              listener.onStateChange();
+            }
+          }
+        } catch (Exception e) {
+          errorHandler.accept("Exception in LeaderBarrierCompleteScheduler. Stopping the processor...");
+        }
+      }, BARRIER_REACHED_DELAY_SEC, BARRIER_REACHED_DELAY_SEC, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public void setStateChangeListener(SchedulerStateChangeListener listener) {
+    this.listener = listener;
+  }
+
+  @Override
+  public void shutdown() {
+    LOG.info("Shutting down LeaderBarrierCompleteScheduler Scheduler.");
+    scheduler.shutdownNow();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderLivenessCheckScheduler.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderLivenessCheckScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderLivenessCheckScheduler.java
new file mode 100644
index 0000000..e0fa448
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderLivenessCheckScheduler.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.scheduler;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.samza.util.BlobUtils;
+import org.apache.samza.coordinator.data.ProcessorEntity;
+import org.apache.samza.util.TableUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Scheduler class invoked by each processor to check if the leader is alive.
+ * Checks every 30 seconds.
+ * If a processor row hasn't been updated since 30 seconds, the system assumes that the processor has died.
+ * All time units are in SECONDS.
+ */
+public class LeaderLivenessCheckScheduler implements TaskScheduler {
+  private static final Logger LOG = LoggerFactory.getLogger(LeaderLivenessCheckScheduler.class);
+  private static final long LIVENESS_CHECK_DELAY_SEC = 10;
+  private static final long LIVENESS_DEBOUNCE_TIME_SEC = 30;
+  private static final ThreadFactory
+      PROCESSOR_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("LeaderLivenessCheckScheduler-%d").build();
+  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(PROCESSOR_THREAD_FACTORY);
+  private final TableUtils table;
+  private final AtomicReference<String> currentJMVersion;
+  private final BlobUtils blob;
+  private final Consumer<String> errorHandler;
+  private final String initialState;
+  private SchedulerStateChangeListener listener = null;
+
+  public LeaderLivenessCheckScheduler(Consumer<String> errorHandler, TableUtils table, BlobUtils blob, AtomicReference<String> currentJMVersion, String initialState) {
+    this.table = table;
+    this.blob = blob;
+    this.currentJMVersion = currentJMVersion;
+    this.initialState = initialState;
+    this.errorHandler = errorHandler;
+  }
+
+  @Override
+  public ScheduledFuture scheduleTask() {
+    return scheduler.scheduleWithFixedDelay(() -> {
+        try {
+          LOG.info("Checking for leader liveness");
+          if (!checkIfLeaderAlive()) {
+            listener.onStateChange();
+          }
+        } catch (Exception e) {
+          errorHandler.accept("Exception in Leader Liveness Check Scheduler. Stopping the processor...");
+        }
+      }, LIVENESS_CHECK_DELAY_SEC, LIVENESS_CHECK_DELAY_SEC, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public void setStateChangeListener(SchedulerStateChangeListener listener) {
+    this.listener = listener;
+  }
+
+  private boolean checkIfLeaderAlive() {
+    String currJMV = currentJMVersion.get();
+    String blobJMV = blob.getJobModelVersion();
+    //Get the leader processor row from the table.
+    Iterable<ProcessorEntity> tableList = table.getEntitiesWithPartition(currJMV);
+    ProcessorEntity leader = null, nextLeader = null;
+    for (ProcessorEntity entity: tableList) {
+      if (entity.getIsLeader()) {
+        leader = entity;
+        break;
+      }
+    }
+    int currJMVInt = 0;
+    if (!currJMV.equals(initialState)) {
+      currJMVInt = Integer.valueOf(currJMV);
+    }
+    if (Integer.valueOf(blobJMV) > currJMVInt) {
+      for (ProcessorEntity entity : table.getEntitiesWithPartition(blobJMV)) {
+        if (entity.getIsLeader()) {
+          nextLeader = entity;
+          break;
+        }
+      }
+    }
+    // Check if row hasn't been updated since 30 seconds.
+    if ((leader == null || (System.currentTimeMillis() - leader.getTimestamp().getTime() >= (
+        LIVENESS_DEBOUNCE_TIME_SEC * 1000))) && nextLeader == null) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public void shutdown() {
+    LOG.info("Shutting down LeaderLivenessCheckScheduler Scheduler.");
+    scheduler.shutdownNow();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LivenessCheckScheduler.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LivenessCheckScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LivenessCheckScheduler.java
new file mode 100644
index 0000000..d4715f3
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LivenessCheckScheduler.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.scheduler;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.samza.util.BlobUtils;
+import org.apache.samza.util.TableUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Scheduler class invoked by the leader to check for changes in the list of live processors.
+ * Checks every 30 seconds.
+ * If a processor row hasn't been updated since 30 seconds, the system assumes that the processor has died.
+ * All time units are in SECONDS.
+ */
+public class LivenessCheckScheduler implements TaskScheduler {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LivenessCheckScheduler.class);
+  private static final long LIVENESS_CHECK_DELAY_SEC = 5;
+  private static final ThreadFactory
+      PROCESSOR_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("LivenessCheckScheduler-%d").build();
+  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(PROCESSOR_THREAD_FACTORY);
+  private final TableUtils table;
+  private final BlobUtils blob;
+  private final AtomicReference<String> currentJMVersion;
+  private final AtomicReference<List<String>> liveProcessorsList = new AtomicReference<>(null);
+  private final Consumer<String> errorHandler;
+  private SchedulerStateChangeListener listener = null;
+  private final String processorId;
+
+  public LivenessCheckScheduler(Consumer<String> errorHandler, TableUtils table, BlobUtils blob, AtomicReference<String> currentJMVersion, final String pid) {
+    this.table = table;
+    this.blob = blob;
+    this.currentJMVersion = currentJMVersion;
+    this.errorHandler = errorHandler;
+    this.processorId = pid;
+  }
+
+  @Override
+  public ScheduledFuture scheduleTask() {
+    return scheduler.scheduleWithFixedDelay(() -> {
+        try {
+          if (!table.getEntity(currentJMVersion.get(), processorId).getIsLeader()) {
+            LOG.info("Not the leader anymore. Shutting down LivenessCheckScheduler.");
+            scheduler.shutdownNow();
+            return;
+          }
+          LOG.info("Checking for list of live processors");
+          //Get the list of live processors published on the blob.
+          Set<String> currProcessors = new HashSet<>(blob.getLiveProcessorList());
+          //Get the list of live processors from the table. This is the current system state.
+          Set<String> liveProcessors = table.getActiveProcessorsList(currentJMVersion);
+          //Invoke listener if the table list is not consistent with the blob list.
+          if (!liveProcessors.equals(currProcessors)) {
+            liveProcessorsList.getAndSet(new ArrayList<>(liveProcessors));
+            listener.onStateChange();
+          }
+        } catch (Exception e) {
+          errorHandler.accept("Exception in Liveness Check Scheduler. Stopping the processor...");
+        }
+      }, LIVENESS_CHECK_DELAY_SEC, LIVENESS_CHECK_DELAY_SEC, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public void setStateChangeListener(SchedulerStateChangeListener listener) {
+    this.listener = listener;
+  }
+
+  public AtomicReference<List<String>> getLiveProcessors() {
+    return liveProcessorsList;
+  }
+
+  @Override
+  public void shutdown() {
+    LOG.info("Shutting down LivenessCheckScheduler Scheduler.");
+    scheduler.shutdownNow();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/RenewLeaseScheduler.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/RenewLeaseScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/RenewLeaseScheduler.java
new file mode 100644
index 0000000..f158122
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/RenewLeaseScheduler.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.scheduler;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.samza.util.LeaseBlobManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Scheduler class to keep renewing the lease once an entity has acquired it.
+ * Renews every 45 seconds.
+ * All time units are in SECONDS.
+ */
+public class RenewLeaseScheduler implements TaskScheduler {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RenewLeaseScheduler.class);
+  private static final long RENEW_LEASE_DELAY_SEC = 45;
+  private static final ThreadFactory
+      PROCESSOR_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("RenewLeaseScheduler-%d").build();
+  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(PROCESSOR_THREAD_FACTORY);
+  private final LeaseBlobManager leaseBlobManager;
+  private final AtomicReference<String> leaseId;
+  private final Consumer<String> errorHandler;
+
+  public RenewLeaseScheduler(Consumer<String> errorHandler, LeaseBlobManager leaseBlobManager, AtomicReference<String> leaseId) {
+    this.leaseBlobManager = leaseBlobManager;
+    this.leaseId = leaseId;
+    this.errorHandler = errorHandler;
+  }
+
+  @Override
+  public ScheduledFuture scheduleTask() {
+    return scheduler.scheduleWithFixedDelay(() -> {
+        try {
+          LOG.info("Renewing lease");
+          boolean status = leaseBlobManager.renewLease(leaseId.get());
+          if (!status) {
+            errorHandler.accept("Unable to renew lease. Continuing as non-leader.");
+          }
+        } catch (Exception e) {
+          errorHandler.accept("Exception in Renew Lease Scheduler. Continuing as non-leader.");
+        }
+      }, RENEW_LEASE_DELAY_SEC, RENEW_LEASE_DELAY_SEC, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public void setStateChangeListener(SchedulerStateChangeListener listener) {}
+
+  @Override
+  public void shutdown() {
+    LOG.info("Shutting down RenewLeaseScheduler Scheduler.");
+    scheduler.shutdownNow();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/SchedulerStateChangeListener.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/SchedulerStateChangeListener.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/SchedulerStateChangeListener.java
new file mode 100644
index 0000000..95fc4e1
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/SchedulerStateChangeListener.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.scheduler;
+
+/**
+ * Listener interface for Azure Job Coordinator, to track state changes and take necessary actions.
+ */
+public interface SchedulerStateChangeListener {
+
+  void onStateChange();
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/TaskScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/TaskScheduler.java
new file mode 100644
index 0000000..63d6e24
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/TaskScheduler.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.scheduler;
+
+import java.util.concurrent.ScheduledFuture;
+
+
+/**
+ * Interface for scheduling tasks for Azure Job Coordinator.
+ */
+public interface TaskScheduler {
+
+  ScheduledFuture scheduleTask();
+
+  void setStateChangeListener(SchedulerStateChangeListener listener);
+
+  void shutdown();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/util/BlobUtils.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/util/BlobUtils.java b/samza-azure/src/main/java/org/apache/samza/util/BlobUtils.java
new file mode 100644
index 0000000..85e4273
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/util/BlobUtils.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util;
+
+import com.microsoft.azure.storage.AccessCondition;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobClient;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.CloudPageBlob;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.samza.AzureClient;
+import org.apache.samza.AzureException;
+import org.apache.samza.coordinator.data.JobModelBundle;
+import org.apache.samza.SamzaException;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.eclipse.jetty.http.HttpStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Client side class that has reference to Azure blob storage.
+ * Used for writing and reading from the blob.
+ * Every write requires a valid lease ID.
+ */
+public class BlobUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(BlobUtils.class);
+  private static final long JOB_MODEL_BLOCK_SIZE = 1024000;
+  private static final long BARRIER_STATE_BLOCK_SIZE = 1024;
+  private static final long PROCESSOR_LIST_BLOCK_SIZE = 1024;
+  private CloudBlobClient blobClient;
+  private CloudBlobContainer container;
+  private CloudPageBlob blob;
+
+  /**
+   * Creates an object of BlobUtils. It creates the container and page blob if they don't exist already.
+   * @param client Client handle for access to Azure Storage account.
+   * @param containerName Name of container inside which we want the blob to reside.
+   * @param blobName Name of the blob to be managed.
+   * @param length Length of the page blob.
+   * @throws AzureException If an Azure storage service error occurred, or when the container name or blob name is invalid.
+   */
+  public BlobUtils(AzureClient client, String containerName, String blobName, long length) {
+    this.blobClient = client.getBlobClient();
+    try {
+      this.container = blobClient.getContainerReference(containerName);
+      container.createIfNotExists();
+      this.blob = container.getPageBlobReference(blobName);
+      if (!blob.exists()) {
+        blob.create(length, AccessCondition.generateIfNotExistsCondition(), null, null);
+      }
+    } catch (URISyntaxException e) {
+      LOG.error("Container name: " + containerName + " or blob name: " + blobName + " invalid.", e);
+      throw new AzureException(e);
+    } catch (StorageException e) {
+      int httpStatusCode = e.getHttpStatusCode();
+      if (httpStatusCode == HttpStatus.CONFLICT_409) {
+        LOG.info("The blob you're trying to create exists already.", e);
+      } else {
+        LOG.error("Azure Storage Exception!", e);
+        throw new AzureException(e);
+      }
+    }
+  }
+
+  /**
+   * Writes the job model to the blob.
+   * Write is successful only if the lease ID passed is valid and the processor holds the lease.
+   * Called by the leader.
+   * @param prevJM Previous job model version that the processor was operating on.
+   * @param currJM Current job model version that the processor is operating on.
+   * @param prevJMV Previous job model version that the processor was operating on.
+   * @param currJMV Current job model version that the processor is operating on.
+   * @param leaseId LeaseID of the lease that the processor holds on the blob. Null if there is no lease.
+   * @return true if write to the blob is successful, false if leaseID is null or an Azure storage service error or IO exception occurred.
+   */
+  public boolean publishJobModel(JobModel prevJM, JobModel currJM, String prevJMV, String currJMV, String leaseId) {
+    try {
+      if (leaseId == null) {
+        return false;
+      }
+      JobModelBundle bundle = new JobModelBundle(prevJM, currJM, prevJMV, currJMV);
+      byte[] data = SamzaObjectMapper.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsBytes(bundle);
+      byte[] pageData = Arrays.copyOf(data, (int) JOB_MODEL_BLOCK_SIZE);
+      InputStream is = new ByteArrayInputStream(pageData);
+      blob.uploadPages(is, 0, JOB_MODEL_BLOCK_SIZE, AccessCondition.generateLeaseCondition(leaseId), null, null);
+      LOG.info("Uploaded {} jobModel to blob", bundle.getCurrJobModel());
+      return true;
+    } catch (StorageException | IOException e) {
+      LOG.error("JobModel publish failed for version = " + currJMV, e);
+      return false;
+    }
+  }
+
+  /**
+   * Reads the current job model from the blob.
+   * @return The current job model published on the blob. Returns null when job model details not found on the blob.
+   * @throws AzureException in getJobModelBundle() if an Azure storage service error occurred.
+   * @throws SamzaException in getJobModelBundle() if data retrieved from blob could not be parsed by SamzaObjectMapper.
+   */
+  public JobModel getJobModel() {
+    LOG.info("Reading the job model from blob.");
+    JobModelBundle jmBundle = getJobModelBundle();
+    if (jmBundle == null) {
+      LOG.error("Job Model details don't exist on the blob.");
+      return null;
+    }
+    JobModel jm = jmBundle.getCurrJobModel();
+    return jm;
+  }
+
+  /**
+   * Reads the current job model version from the blob .
+   * @return Current job model version published on the blob. Returns null when job model details not found on the blob.
+   * @throws AzureException in getJobModelBundle() if an Azure storage service error occurred.
+   * @throws SamzaException in getJobModelBundle() if data retrieved from blob could not be parsed by SamzaObjectMapper.
+   */
+  public String getJobModelVersion() {
+    LOG.info("Reading the job model version from blob.");
+    JobModelBundle jmBundle = getJobModelBundle();
+    if (jmBundle == null) {
+      LOG.error("Job Model details don't exist on the blob.");
+      return null;
+    }
+    String jmVersion = jmBundle.getCurrJobModelVersion();
+    return jmVersion;
+  }
+
+  /**
+   * Writes the barrier state to the blob.
+   * Write is successful only if the lease ID passed is valid and the processor holds the lease.
+   * Called only by the leader.
+   * @param state Barrier state to be published to the blob.
+   * @param leaseId LeaseID of the valid lease that the processor holds on the blob. Null if there is no lease.
+   * @return true if write to the blob is successful, false if leaseID is null or an Azure storage service error or IO exception occurred.
+   */
+  public boolean publishBarrierState(String state, String leaseId) {
+    try {
+      if (leaseId == null) {
+        return false;
+      }
+      byte[] data = SamzaObjectMapper.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsBytes(state);
+      byte[] pageData = Arrays.copyOf(data, (int) BARRIER_STATE_BLOCK_SIZE);
+      InputStream is = new ByteArrayInputStream(pageData);
+
+      //uploadPages is only successful when the AccessCondition provided has an active and valid lease ID. It fails otherwise.
+      blob.uploadPages(is, JOB_MODEL_BLOCK_SIZE, BARRIER_STATE_BLOCK_SIZE, AccessCondition.generateLeaseCondition(leaseId), null, null);
+      LOG.info("Uploaded barrier state {} to blob", state);
+      return true;
+    } catch (StorageException | IOException e) {
+      LOG.error("Barrier state " + state + " publish failed", e);
+      return false;
+    }
+  }
+
+  /**
+   * Reads the current barrier state from the blob.
+   * @return Barrier state published on the blob.
+   * @throws AzureException If an Azure storage service error occurred.
+   * @throws SamzaException If data retrieved from blob could not be parsed by SamzaObjectMapper.
+   */
+  public String getBarrierState() {
+    LOG.info("Reading the barrier state from blob.");
+    byte[] data = new byte[(int) BARRIER_STATE_BLOCK_SIZE];
+    try {
+      blob.downloadRangeToByteArray(JOB_MODEL_BLOCK_SIZE, BARRIER_STATE_BLOCK_SIZE, data, 0);
+    } catch (StorageException e) {
+      LOG.error("Failed to read barrier state from blob.", e);
+      throw new AzureException(e);
+    }
+    String state;
+    try {
+      state = SamzaObjectMapper.getObjectMapper().readValue(data, String.class);
+    } catch (IOException e) {
+      LOG.error("Failed to parse byte data: " + data + " for barrier state retrieved from the blob.", e);
+      throw new SamzaException(e);
+    }
+    return state;
+  }
+
+  /**
+   * Writes the list of live processors in the system to the blob.
+   * Write is successful only if the lease ID passed is valid and the processor holds the lease.
+   * Called only by the leader.
+   * @param processors List of live processors to be published on the blob.
+   * @param leaseId LeaseID of the valid lease that the processor holds on the blob. Null if there is no lease.
+   * @return true if write to the blob is successful, false if leaseID is null or an Azure storage service error or IO exception occurred.
+   */
+  public boolean publishLiveProcessorList(List<String> processors, String leaseId) {
+    try {
+      if (leaseId == null) {
+        return false;
+      }
+      byte[] data = SamzaObjectMapper.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsBytes(processors);
+      byte[] pageData = Arrays.copyOf(data, (int) BARRIER_STATE_BLOCK_SIZE);
+      InputStream is = new ByteArrayInputStream(pageData);
+      blob.uploadPages(is, JOB_MODEL_BLOCK_SIZE + BARRIER_STATE_BLOCK_SIZE, PROCESSOR_LIST_BLOCK_SIZE, AccessCondition.generateLeaseCondition(leaseId), null, null);
+      LOG.info("Uploaded list of live processors to blob.");
+      return true;
+    } catch (StorageException | IOException e) {
+      LOG.error("Processor list: " + processors + "publish failed", e);
+      return false;
+    }
+  }
+
+  /**
+   * Reads the list of live processors published on the blob.
+   * @return String list of live processors.
+   * @throws AzureException If an Azure storage service error occurred.
+   * @throws SamzaException If data retrieved from blob could not be parsed by SamzaObjectMapper.
+   */
+  public List<String> getLiveProcessorList() {
+    LOG.info("Read the the list of live processors from blob.");
+    byte[] data = new byte[(int) PROCESSOR_LIST_BLOCK_SIZE];
+    try {
+      blob.downloadRangeToByteArray(JOB_MODEL_BLOCK_SIZE + BARRIER_STATE_BLOCK_SIZE, PROCESSOR_LIST_BLOCK_SIZE, data, 0);
+    } catch (StorageException e) {
+      LOG.error("Failed to read the list of live processors from the blob.", new AzureException(e));
+      throw new AzureException(e);
+    }
+    List<String> list;
+    try {
+      list = SamzaObjectMapper.getObjectMapper().readValue(data, List.class);
+    } catch (IOException e) {
+      LOG.error("Failed to parse byte data: " + data + " for live processor list retrieved from the blob", new SamzaException(e));
+      throw new SamzaException(e);
+    }
+    return list;
+  }
+
+  public CloudBlobClient getBlobClient() {
+    return this.blobClient;
+  }
+
+  public CloudBlobContainer getBlobContainer() {
+    return this.container;
+  }
+
+  public CloudPageBlob getBlob() {
+    return this.blob;
+  }
+
+  private JobModelBundle getJobModelBundle() {
+    byte[] data = new byte[(int) JOB_MODEL_BLOCK_SIZE];
+    try {
+      blob.downloadRangeToByteArray(0, JOB_MODEL_BLOCK_SIZE, data, 0);
+    } catch (StorageException e) {
+      LOG.error("Failed to read JobModel details from the blob.", e);
+      throw new AzureException(e);
+    }
+    try {
+      JobModelBundle jmBundle = SamzaObjectMapper.getObjectMapper().readValue(data, JobModelBundle.class);
+      return jmBundle;
+    } catch (IOException e) {
+      LOG.error("Failed to parse byte data: " + data + " for JobModel details retrieved from the blob", e);
+      throw new SamzaException(e);
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/util/LeaseBlobManager.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/util/LeaseBlobManager.java b/samza-azure/src/main/java/org/apache/samza/util/LeaseBlobManager.java
new file mode 100644
index 0000000..dffb7ae
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/util/LeaseBlobManager.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util;
+
+import com.microsoft.azure.storage.AccessCondition;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudPageBlob;
+import org.apache.samza.AzureException;
+import org.eclipse.jetty.http.HttpStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Helper class for lease blob operations.
+ */
+public class LeaseBlobManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(LeaseBlobManager.class);
+  private final CloudPageBlob leaseBlob;
+
+  public LeaseBlobManager(CloudPageBlob leaseBlob) {
+    this.leaseBlob = leaseBlob;
+  }
+
+  /**
+   * Acquires a lease on a blob. The lease ID is NULL initially.
+   * @param leaseTimeInSec The time in seconds you want to acquire the lease for.
+   * @param leaseId Proposed ID you want to acquire the lease with, null if not proposed.
+   * @return String that represents lease ID.  Null if acquireLease is unsuccessful because the blob is leased already.
+   * @throws AzureException If a Azure storage service error occurred. This includes the case where the blob you're trying to lease does not exist.
+   */
+  public String acquireLease(int leaseTimeInSec, String leaseId) {
+    try {
+      String id = leaseBlob.acquireLease(leaseTimeInSec, leaseId);
+      LOG.info("Acquired lease with lease id = " + id);
+      return id;
+    } catch (StorageException storageException) {
+      int httpStatusCode = storageException.getHttpStatusCode();
+      if (httpStatusCode == HttpStatus.CONFLICT_409) {
+        LOG.info("The blob you're trying to acquire is leased already.", storageException.getMessage());
+      } else if (httpStatusCode == HttpStatus.NOT_FOUND_404) {
+        LOG.error("The blob you're trying to lease does not exist.", storageException);
+        throw new AzureException(storageException);
+      } else {
+        LOG.error("Error acquiring lease!", storageException);
+        throw new AzureException(storageException);
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Renews the lease on the blob.
+   * @param leaseId ID of the lease to be renewed.
+   * @return True if lease was renewed successfully, false otherwise.
+   */
+  public boolean renewLease(String leaseId) {
+    try {
+      leaseBlob.renewLease(AccessCondition.generateLeaseCondition(leaseId));
+      return true;
+    } catch (StorageException storageException) {
+      LOG.error("Wasn't able to renew lease with lease id: " + leaseId, storageException);
+      return false;
+    }
+  }
+
+  /**
+   * Releases the lease on the blob.
+   * @param leaseId ID of the lease to be released.
+   * @return True if released successfully, false otherwise.
+   */
+  public boolean releaseLease(String leaseId) {
+    try {
+      leaseBlob.releaseLease(AccessCondition.generateLeaseCondition(leaseId));
+      return true;
+    } catch (StorageException storageException) {
+      LOG.error("Wasn't able to release lease with lease id: " + leaseId, storageException);
+      return false;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/util/TableUtils.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/util/TableUtils.java b/samza-azure/src/main/java/org/apache/samza/util/TableUtils.java
new file mode 100644
index 0000000..f49ce27
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/util/TableUtils.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.table.CloudTable;
+import com.microsoft.azure.storage.table.CloudTableClient;
+import com.microsoft.azure.storage.table.TableOperation;
+import com.microsoft.azure.storage.table.TableQuery;
+import java.net.URISyntaxException;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.samza.AzureClient;
+import org.apache.samza.AzureException;
+import org.apache.samza.coordinator.data.ProcessorEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ *  Client side class that has a reference to Azure Table Storage.
+ *  Enables the user to add or delete information from the table, make updates to the table and retrieve information from the table.
+ *  Every row in a table is uniquely identified by a combination of the PARTIITON KEY and ROW KEY.
+ *  PARTITION KEY = Group ID = Job Model Version (for this case).
+ *  ROW KEY = Unique entity ID for a group = Processor ID (for this case).
+ */
+public class TableUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TableUtils.class);
+  private static final String PARTITION_KEY = "PartitionKey";
+  private static final long LIVENESS_DEBOUNCE_TIME_SEC = 30;
+  private final String initialState;
+  private final CloudTableClient tableClient;
+  private final CloudTable table;
+
+  public TableUtils(AzureClient client, String tableName, String initialState) {
+    this.initialState = initialState;
+    tableClient = client.getTableClient();
+    try {
+      table = tableClient.getTableReference(tableName);
+      table.createIfNotExists();
+    } catch (URISyntaxException e) {
+      LOG.error("\nConnection string specifies an invalid URI.", e);
+      throw new AzureException(e);
+    } catch (StorageException e) {
+      LOG.error("Azure storage exception.", e);
+      throw new AzureException(e);
+    }
+  }
+
+  /**
+   * Add a row which denotes an active processor to the processor table.
+   * @param jmVersion Job model version that the processor is operating on.
+   * @param pid Unique processor ID.
+   * @param isLeader Denotes whether the processor is a leader or not.
+   * @throws AzureException If an Azure storage service error occurred.
+   */
+  public void addProcessorEntity(String jmVersion, String pid, boolean isLeader) {
+    ProcessorEntity entity = new ProcessorEntity(jmVersion, pid);
+    entity.setIsLeader(isLeader);
+    entity.updateLiveness();
+    TableOperation add = TableOperation.insert(entity);
+    try {
+      table.execute(add);
+    } catch (StorageException e) {
+      LOG.error("Azure storage exception while adding processor entity with job model version: " + jmVersion + "and pid: " + pid, e);
+      throw new AzureException(e);
+    }
+  }
+
+  /**
+   * Retrieve a particular row in the processor table, given the partition key and the row key.
+   * @param jmVersion Job model version of the processor row to be retrieved.
+   * @param pid Unique processor ID of the processor row to be retrieved.
+   * @return An instance of required processor entity. Null if does not exist.
+   * @throws AzureException If an Azure storage service error occurred.
+   */
+  public ProcessorEntity getEntity(String jmVersion, String pid) {
+    try {
+      TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class);
+      ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType();
+      return entity;
+    } catch (StorageException e) {
+      LOG.error("Azure storage exception while retrieving processor entity with job model version: " + jmVersion + "and pid: " + pid, e);
+      throw new AzureException(e);
+    }
+  }
+
+  /**
+   * Updates the liveness value of a particular processor with a randomly generated integer, which in turn updates the last modified since timestamp of the row.
+   * @param jmVersion Job model version of the processor row to be updated.
+   * @param pid Unique processor ID of the processor row to be updated.
+   */
+  public void updateHeartbeat(String jmVersion, String pid) {
+    try {
+      Random rand = new Random();
+      TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class);
+      ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType();
+      entity.updateLiveness();
+      TableOperation update = TableOperation.replace(entity);
+      table.execute(update);
+    } catch (StorageException e) {
+      LOG.error("Azure storage exception while updating heartbeat for job model version: " + jmVersion + "and pid: " + pid, e);
+    }
+  }
+
+  /**
+   * Updates the isLeader value when the processor starts or stops being a leader.
+   * @param jmVersion Job model version of the processor row to be updated.
+   * @param pid Unique processor ID of the processor row to be updated.
+   * @param isLeader Denotes whether the processor is a leader or not.
+   * @throws AzureException If an Azure storage service error occurred.
+   */
+  public void updateIsLeader(String jmVersion, String pid, boolean isLeader) {
+    try {
+      TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class);
+      ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType();
+      entity.setIsLeader(isLeader);
+      TableOperation update = TableOperation.replace(entity);
+      table.execute(update);
+    } catch (StorageException e) {
+      LOG.error("Azure storage exception while updating isLeader value for job model version: " + jmVersion + "and pid: " + pid, e);
+      throw new AzureException(e);
+    }
+  }
+
+  /**
+   * Deletes a specified row in the processor table.
+   * @param jmVersion Job model version of the processor row to be deleted.
+   * @param pid Unique processor ID of the processor row to be deleted.
+   * @throws AzureException If an Azure storage service error occurred.
+   */
+  public void deleteProcessorEntity(String jmVersion, String pid) {
+    try {
+      TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class);
+      ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType();
+      TableOperation remove = TableOperation.delete(entity);
+      table.execute(remove);
+    } catch (StorageException e) {
+      LOG.error("Azure storage exception while deleting processor entity with job model version: " + jmVersion + "and pid: " + pid, e);
+      throw new AzureException(e);
+    }
+  }
+
+  /**
+   * Retrieve all rows in a table with the given partition key.
+   * @param partitionKey Job model version of the processors to be retrieved.
+   * @return Iterable list of processor entities.
+   */
+  public Iterable<ProcessorEntity> getEntitiesWithPartition(String partitionKey) {
+    String partitionFilter = TableQuery.generateFilterCondition(PARTITION_KEY, TableQuery.QueryComparisons.EQUAL, partitionKey);
+    TableQuery<ProcessorEntity> partitionQuery = TableQuery.from(ProcessorEntity.class).where(partitionFilter);
+    return table.execute(partitionQuery);
+  }
+
+  /**
+   * Gets the list of all active processors that are heartbeating to the processor table.
+   * @param currentJMVersion Current job model version that the processors in the application are operating on.
+   * @return List of ids of currently active processors in the application, retrieved from the processor table.
+   */
+  public Set<String> getActiveProcessorsList(AtomicReference<String> currentJMVersion) {
+    Iterable<ProcessorEntity> tableList = getEntitiesWithPartition(currentJMVersion.get());
+    Set<String> activeProcessorsList = new HashSet<>();
+    for (ProcessorEntity entity: tableList) {
+      if (System.currentTimeMillis() - entity.getTimestamp().getTime() <= (LIVENESS_DEBOUNCE_TIME_SEC * 1000)) {
+        activeProcessorsList.add(entity.getRowKey());
+      }
+    }
+
+    Iterable<ProcessorEntity> unassignedList = getEntitiesWithPartition(initialState);
+    for (ProcessorEntity entity: unassignedList) {
+      long temp = System.currentTimeMillis() - entity.getTimestamp().getTime();
+      LOG.info("Time elapsed since last heartbeat: {}", temp);
+      if (temp <= (LIVENESS_DEBOUNCE_TIME_SEC * 1000)) {
+        activeProcessorsList.add(entity.getRowKey());
+      }
+    }
+    LOG.info("Active processors list: {}", activeProcessorsList);
+    return activeProcessorsList;
+  }
+
+  public CloudTable getTable() {
+    return table;
+  }
+
+}
\ No newline at end of file


[2/2] samza git commit: SAMZA-1378: Introduce and Implement Scheduler Interface for Polling in Azure

Posted by na...@apache.org.
SAMZA-1378: Introduce and Implement Scheduler Interface for Polling in Azure

PR 1: AzureClient + AzureConfig
PR 2: LeaseBlobManager
PR 3: BlobUtils + JobModelBundle
PR 4: TableUtils + ProcessorEntity
PR 5: AzureLeaderElector
PR 6: Added all schedulers (current PR)

Author: PawasChhokra <Jaimatadi1$>
Author: PawasChhokra <pa...@gmail.com>

Reviewers: Navina Ramesh <na...@apache.org>

Closes #261 from PawasChhokra/AzureSchedulers


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c3b447ec
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c3b447ec
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c3b447ec

Branch: refs/heads/master
Commit: c3b447ecb343ddc4e48296448127fda5dfafe913
Parents: 1d253c7
Author: Pawas Chhokra <pa...@gmail.com>
Authored: Fri Aug 18 14:38:46 2017 -0700
Committer: navina <na...@apache.org>
Committed: Fri Aug 18 14:38:46 2017 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/samza/AzureClient.java |  18 +-
 .../main/java/org/apache/samza/AzureConfig.java |  73 ---
 .../org/apache/samza/AzureLeaderElector.java    | 111 ----
 .../main/java/org/apache/samza/BlobUtils.java   | 280 ----------
 .../java/org/apache/samza/JobModelBundle.java   |  61 ---
 .../java/org/apache/samza/LeaseBlobManager.java |  98 ----
 .../java/org/apache/samza/ProcessorEntity.java  |  58 ---
 .../main/java/org/apache/samza/TableUtils.java  | 198 --------
 .../org/apache/samza/config/AzureConfig.java    |  68 +++
 .../samza/coordinator/AzureJobCoordinator.java  | 509 +++++++++++++++++++
 .../samza/coordinator/AzureLeaderElector.java   | 109 ++++
 .../samza/coordinator/data/BarrierState.java    |  27 +
 .../samza/coordinator/data/JobModelBundle.java  |  61 +++
 .../samza/coordinator/data/ProcessorEntity.java |  62 +++
 .../scheduler/HeartbeatScheduler.java           |  81 +++
 .../scheduler/JMVersionUpgradeScheduler.java    |  99 ++++
 .../LeaderBarrierCompleteScheduler.java         | 118 +++++
 .../scheduler/LeaderLivenessCheckScheduler.java | 120 +++++
 .../scheduler/LivenessCheckScheduler.java       | 108 ++++
 .../scheduler/RenewLeaseScheduler.java          |  79 +++
 .../scheduler/SchedulerStateChangeListener.java |  29 ++
 .../coordinator/scheduler/TaskScheduler.java    |  35 ++
 .../java/org/apache/samza/util/BlobUtils.java   | 284 +++++++++++
 .../org/apache/samza/util/LeaseBlobManager.java |  99 ++++
 .../java/org/apache/samza/util/TableUtils.java  | 205 ++++++++
 25 files changed, 2105 insertions(+), 885 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/AzureClient.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/AzureClient.java b/samza-azure/src/main/java/org/apache/samza/AzureClient.java
index 2248d12..04f8fd3 100644
--- a/samza-azure/src/main/java/org/apache/samza/AzureClient.java
+++ b/samza-azure/src/main/java/org/apache/samza/AzureClient.java
@@ -25,6 +25,7 @@ import com.microsoft.azure.storage.RetryPolicy;
 import com.microsoft.azure.storage.blob.BlobRequestOptions;
 import com.microsoft.azure.storage.blob.CloudBlobClient;
 import com.microsoft.azure.storage.table.CloudTableClient;
+import com.microsoft.azure.storage.table.TableRequestOptions;
 import java.net.URISyntaxException;
 import java.security.InvalidKeyException;
 import org.slf4j.Logger;
@@ -44,21 +45,26 @@ public class AzureClient {
   /**
    * Creates a reference to the Azure Storage account according to the connection string that the client passes.
    * Also creates references to Azure Blob Storage and Azure Table Storage.
-   * @param storageConnectionString Connection string to conenct to Azure Storage Account, format: "DefaultEndpointsProtocol=<https>;AccountName=<>;AccountKey=<>"
+   * @param storageConnectionString Connection string to connect to Azure Storage Account
+   *                                Format: DefaultEndpointsProtocol=https;AccountName="Insert your account name";AccountKey="Insert your account key"
    * @throws AzureException If an Azure storage service error occurred, or when the storageConnectionString is invalid.
    */
-  AzureClient(String storageConnectionString) {
+  public AzureClient(String storageConnectionString) {
     try {
       account = CloudStorageAccount.parse(storageConnectionString);
+      RetryPolicy retryPolicy = new RetryLinearRetry(5000,  3);
 
       blobClient = account.createCloudBlobClient();
       // Set retry policy for operations on the blob. In this case, every failed operation on the blob will be retried thrice, after 5 second intervals.
-      BlobRequestOptions options = new BlobRequestOptions();
-      RetryPolicy retryPolicy = new RetryLinearRetry(5000, 3);
-      options.setRetryPolicyFactory(retryPolicy);
-      blobClient.setDefaultRequestOptions(options);
+      BlobRequestOptions blobOptions = new BlobRequestOptions();
+      blobOptions.setRetryPolicyFactory(retryPolicy);
+      blobClient.setDefaultRequestOptions(blobOptions);
 
+      // Set retry policy for operations on the table. In this case, every failed operation on the table will be retried thrice, after 5 second intervals.
       tableClient = account.createCloudTableClient();
+      TableRequestOptions tableOptions = new TableRequestOptions();
+      tableOptions.setRetryPolicyFactory(retryPolicy);
+      tableClient.setDefaultRequestOptions(tableOptions);
     } catch (IllegalArgumentException | URISyntaxException e) {
       LOG.error("Connection string {} specifies an invalid URI.", storageConnectionString);
       LOG.error("Please confirm the connection string is in the Azure connection string format.");

http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/AzureConfig.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/AzureConfig.java b/samza-azure/src/main/java/org/apache/samza/AzureConfig.java
deleted file mode 100644
index 47873a7..0000000
--- a/samza-azure/src/main/java/org/apache/samza/AzureConfig.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza;
-
-import org.apache.samza.config.ApplicationConfig;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.ConfigException;
-import org.apache.samza.config.MapConfig;
-
-/**
- * Config class for reading all user defined parameters for Azure driven coordination services.
- */
-public class AzureConfig extends MapConfig {
-
-  // Connection string for Azure Storage Account, format: "DefaultEndpointsProtocol=<https>;AccountName=<>;AccountKey=<>"
-  public static final String AZURE_STORAGE_CONNECT = "azure.storage.connect";
-  public static final String AZURE_PAGEBLOB_LENGTH = "job.coordinator.azure.blob.length";
-  public static final long DEFAULT_AZURE_PAGEBLOB_LENGTH = 5120000;
-
-  private static String containerName;
-  private static String blobName;
-  private static String tableName;
-
-  public AzureConfig(Config config) {
-    super(config);
-    ApplicationConfig appConfig = new ApplicationConfig(config);
-    //Remove all non-alphanumeric characters from id as table name does not allow them.
-    String id = appConfig.getGlobalAppId().replaceAll("[^A-Za-z0-9]", "");
-    containerName = "samzacontainer" + id;
-    blobName = "samzablob" + id;
-    tableName = "samzatable" + id;
-  }
-
-  public String getAzureConnect() {
-    if (!containsKey(AZURE_STORAGE_CONNECT)) {
-      throw new ConfigException("Missing " + AZURE_STORAGE_CONNECT + " config!");
-    }
-    return get(AZURE_STORAGE_CONNECT);
-  }
-
-  public String getAzureContainerName() {
-    return containerName;
-  }
-
-  public String getAzureBlobName() {
-    return blobName;
-  }
-
-  public long getAzureBlobLength() {
-    return getLong(AZURE_PAGEBLOB_LENGTH, DEFAULT_AZURE_PAGEBLOB_LENGTH);
-  }
-
-  public String getAzureTableName() {
-    return tableName;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/AzureLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/AzureLeaderElector.java b/samza-azure/src/main/java/org/apache/samza/AzureLeaderElector.java
deleted file mode 100644
index efa8ea1..0000000
--- a/samza-azure/src/main/java/org/apache/samza/AzureLeaderElector.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.samza.coordinator.LeaderElector;
-import org.apache.samza.coordinator.LeaderElectorListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Class to facilitate leader election in Azure.
- * The processor that acquires the lease on the blob becomes the leader.
- * The lease ID is null initially. It is generated by Azure when the processor acquires the lease, and updated accordingly.
- * Every processor requires a valid active lease ID in order to perform successful write and delete operations on the blob.
- * Read operations from the blob are not dependent on the lease ID.
- */
-public class AzureLeaderElector implements LeaderElector {
-
-  private static final Logger LOG = LoggerFactory.getLogger(AzureLeaderElector.class);
-  private static final int LEASE_TIME_IN_SEC = 60;
-  private final LeaseBlobManager leaseBlobManager;
-  private LeaderElectorListener leaderElectorListener = null;
-  private final AtomicReference<String> leaseId;
-  private final AtomicBoolean isLeader;
-
-  public AzureLeaderElector(LeaseBlobManager leaseBlobManager) {
-    this.isLeader = new AtomicBoolean(false);
-    this.leaseBlobManager = leaseBlobManager;
-    this.leaseId = new AtomicReference<>(null);
-  }
-
-  @Override
-  public void setLeaderElectorListener(LeaderElectorListener listener) {
-    this.leaderElectorListener = listener;
-  }
-
-  /**
-   * Tries to become the leader by acquiring a lease on the blob.
-   * The acquireLease operation has a retry policy where upon failure, the operation is tried 3 times at 5 second intervals.
-   * Invokes the listener on becoming the leader.
-   */
-  @Override
-  public void tryBecomeLeader() {
-    try {
-      leaseId.getAndSet(leaseBlobManager.acquireLease(LEASE_TIME_IN_SEC, leaseId.get()));
-      if (leaseId.get() != null) {
-        LOG.info("Became leader with lease ID {}.", leaseId.get());
-        isLeader.set(true);
-        if (leaderElectorListener != null) {
-          leaderElectorListener.onBecomingLeader();
-        }
-      }
-    } catch (AzureException e) {
-      LOG.error("Error while trying to acquire lease.", e);
-    }
-  }
-
-  /**
-   * Releases the lease in order to resign leadership. It also stops all schedulers scheduled by the leader.
-   * The releaseLease operation has a retry policy where upon failure, the operation is tried 3 times at 5 second intervals.
-   */
-  @Override
-  public void resignLeadership() {
-    if (isLeader.get()) {
-      leaseBlobManager.releaseLease(leaseId.get());
-      isLeader.set(false);
-      LOG.info("Resigning leadership with lease ID {}", leaseId.get());
-      leaseId.getAndSet(null);
-    } else {
-      LOG.info("Can't release the lease because it is not the leader and does not hold an active lease.");
-    }
-  }
-
-  /**
-   * Checks whether it's a leader
-   * @return true if it is the leader, false otherwise
-   */
-  @Override
-  public boolean amILeader() {
-    return isLeader.get();
-  }
-
-  public AtomicReference<String> getLeaseId() {
-    return leaseId;
-  }
-
-  public LeaseBlobManager getLeaseBlobManager() {
-    return this.leaseBlobManager;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/BlobUtils.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/BlobUtils.java b/samza-azure/src/main/java/org/apache/samza/BlobUtils.java
deleted file mode 100644
index a798384..0000000
--- a/samza-azure/src/main/java/org/apache/samza/BlobUtils.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza;
-
-import com.microsoft.azure.storage.AccessCondition;
-import com.microsoft.azure.storage.StorageException;
-import com.microsoft.azure.storage.blob.CloudBlobClient;
-import com.microsoft.azure.storage.blob.CloudBlobContainer;
-import com.microsoft.azure.storage.blob.CloudPageBlob;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URISyntaxException;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.samza.job.model.JobModel;
-import org.apache.samza.serializers.model.SamzaObjectMapper;
-import org.eclipse.jetty.http.HttpStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Client side class that has reference to Azure blob storage.
- * Used for writing and reading from the blob.
- * Every write requires a valid lease ID.
- */
-public class BlobUtils {
-
-  private static final Logger LOG = LoggerFactory.getLogger(BlobUtils.class);
-  private static final long JOB_MODEL_BLOCK_SIZE = 1024000;
-  private static final long BARRIER_STATE_BLOCK_SIZE = 1024;
-  private static final long PROCESSOR_LIST_BLOCK_SIZE = 1024;
-  private CloudBlobClient blobClient;
-  private CloudBlobContainer container;
-  private CloudPageBlob blob;
-
-  /**
-   * Creates an object of BlobUtils. It creates the container and page blob if they don't exist already.
-   * @param client Client handle for access to Azure Storage account.
-   * @param containerName Name of container inside which we want the blob to reside.
-   * @param blobName Name of the blob to be managed.
-   * @param length Length of the page blob.
-   * @throws AzureException If an Azure storage service error occurred, or when the container name or blob name is invalid.
-   */
-  public BlobUtils(AzureClient client, String containerName, String blobName, long length) {
-    this.blobClient = client.getBlobClient();
-    try {
-      this.container = blobClient.getContainerReference(containerName);
-      container.createIfNotExists();
-      this.blob = container.getPageBlobReference(blobName);
-      if (!blob.exists()) {
-        blob.create(length, AccessCondition.generateIfNotExistsCondition(), null, null);
-      }
-    } catch (URISyntaxException e) {
-      LOG.error("Container name: " + containerName + " or blob name: " + blobName + " invalid.", e);
-      throw new AzureException(e);
-    } catch (StorageException e) {
-      int httpStatusCode = e.getHttpStatusCode();
-      if (httpStatusCode == HttpStatus.CONFLICT_409) {
-        LOG.info("The blob you're trying to create exists already.", e);
-      } else {
-        LOG.error("Azure Storage Exception!", e);
-        throw new AzureException(e);
-      }
-    }
-  }
-
-  /**
-   * Writes the job model to the blob.
-   * Write is successful only if the lease ID passed is valid and the processor holds the lease.
-   * Called by the leader.
-   * @param prevJM Previous job model version that the processor was operating on.
-   * @param currJM Current job model version that the processor is operating on.
-   * @param prevJMV Previous job model version that the processor was operating on.
-   * @param currJMV Current job model version that the processor is operating on.
-   * @param leaseId LeaseID of the lease that the processor holds on the blob. Null if there is no lease.
-   * @return true if write to the blob is successful, false if leaseID is null or an Azure storage service error or IO exception occurred.
-   */
-  public boolean publishJobModel(JobModel prevJM, JobModel currJM, String prevJMV, String currJMV, String leaseId) {
-    try {
-      if (leaseId == null) {
-        return false;
-      }
-      JobModelBundle bundle = new JobModelBundle(prevJM, currJM, prevJMV, currJMV);
-      byte[] data = SamzaObjectMapper.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsBytes(bundle);
-      byte[] pageData = Arrays.copyOf(data, (int) JOB_MODEL_BLOCK_SIZE);
-      InputStream is = new ByteArrayInputStream(pageData);
-      blob.uploadPages(is, 0, JOB_MODEL_BLOCK_SIZE, AccessCondition.generateLeaseCondition(leaseId), null, null);
-      LOG.info("Uploaded {} jobModel to blob", bundle.getCurrJobModel());
-      return true;
-    } catch (StorageException | IOException e) {
-      LOG.error("JobModel publish failed for version = " + currJMV, e);
-      return false;
-    }
-  }
-
-  /**
-   * Reads the current job model from the blob.
-   * @return The current job model published on the blob. Returns null when job model details not found on the blob.
-   * @throws AzureException in getJobModelBundle() if an Azure storage service error occurred.
-   * @throws SamzaException in getJobModelBundle() if data retrieved from blob could not be parsed by SamzaObjectMapper.
-   */
-  public JobModel getJobModel() {
-    LOG.info("Reading the job model from blob.");
-    JobModelBundle jmBundle = getJobModelBundle();
-    if (jmBundle == null) {
-      LOG.error("Job Model details don't exist on the blob.");
-      return null;
-    }
-    JobModel jm = jmBundle.getCurrJobModel();
-    return jm;
-  }
-
-  /**
-   * Reads the current job model version from the blob .
-   * @return Current job model version published on the blob. Returns null when job model details not found on the blob.
-   * @throws AzureException in getJobModelBundle() if an Azure storage service error occurred.
-   * @throws SamzaException in getJobModelBundle() if data retrieved from blob could not be parsed by SamzaObjectMapper.
-   */
-  public String getJobModelVersion() {
-    LOG.info("Reading the job model version from blob.");
-    JobModelBundle jmBundle = getJobModelBundle();
-    if (jmBundle == null) {
-      LOG.error("Job Model details don't exist on the blob.");
-      return null;
-    }
-    String jmVersion = jmBundle.getCurrJobModelVersion();
-    return jmVersion;
-  }
-
-  /**
-   * Writes the barrier state to the blob.
-   * Write is successful only if the lease ID passed is valid and the processor holds the lease.
-   * Called only by the leader.
-   * @param state Barrier state to be published to the blob.
-   * @param leaseId LeaseID of the valid lease that the processor holds on the blob. Null if there is no lease.
-   * @return true if write to the blob is successful, false if leaseID is null or an Azure storage service error or IO exception occurred.
-   */
-  public boolean publishBarrierState(String state, String leaseId) {
-    try {
-      if (leaseId == null) {
-        return false;
-      }
-      byte[] data = SamzaObjectMapper.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsBytes(state);
-      byte[] pageData = Arrays.copyOf(data, (int) BARRIER_STATE_BLOCK_SIZE);
-      InputStream is = new ByteArrayInputStream(pageData);
-
-      //uploadPages is only successful when the AccessCondition provided has an active and valid lease ID. It fails otherwise.
-      blob.uploadPages(is, JOB_MODEL_BLOCK_SIZE, BARRIER_STATE_BLOCK_SIZE, AccessCondition.generateLeaseCondition(leaseId), null, null);
-      LOG.info("Uploaded barrier state {} to blob", state);
-      return true;
-    } catch (StorageException | IOException e) {
-      LOG.error("Barrier state " + state + " publish failed", e);
-      return false;
-    }
-  }
-
-  /**
-   * Reads the current barrier state from the blob.
-   * @return Barrier state published on the blob.
-   * @throws AzureException If an Azure storage service error occurred.
-   * @throws SamzaException If data retrieved from blob could not be parsed by SamzaObjectMapper.
-   */
-  public String getBarrierState() {
-    LOG.info("Reading the barrier state from blob.");
-    byte[] data = new byte[(int) BARRIER_STATE_BLOCK_SIZE];
-    try {
-      blob.downloadRangeToByteArray(JOB_MODEL_BLOCK_SIZE, BARRIER_STATE_BLOCK_SIZE, data, 0);
-    } catch (StorageException e) {
-      LOG.error("Failed to read barrier state from blob.", e);
-      throw new AzureException(e);
-    }
-    String state;
-    try {
-      state = SamzaObjectMapper.getObjectMapper().readValue(data, String.class);
-    } catch (IOException e) {
-      LOG.error("Failed to parse byte data: " + data + " for barrier state retrieved from the blob.", e);
-      throw new SamzaException(e);
-    }
-    return state;
-  }
-
-  /**
-   * Writes the list of live processors in the system to the blob.
-   * Write is successful only if the lease ID passed is valid and the processor holds the lease.
-   * Called only by the leader.
-   * @param processors List of live processors to be published on the blob.
-   * @param leaseId LeaseID of the valid lease that the processor holds on the blob. Null if there is no lease.
-   * @return true if write to the blob is successful, false if leaseID is null or an Azure storage service error or IO exception occurred.
-   */
-  public boolean publishLiveProcessorList(List<String> processors, String leaseId) {
-    try {
-      if (leaseId == null) {
-        return false;
-      }
-      byte[] data = SamzaObjectMapper.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsBytes(processors);
-      byte[] pageData = Arrays.copyOf(data, (int) BARRIER_STATE_BLOCK_SIZE);
-      InputStream is = new ByteArrayInputStream(pageData);
-      blob.uploadPages(is, JOB_MODEL_BLOCK_SIZE + BARRIER_STATE_BLOCK_SIZE, PROCESSOR_LIST_BLOCK_SIZE, AccessCondition.generateLeaseCondition(leaseId), null, null);
-      LOG.info("Uploaded list of live processors to blob.");
-      return true;
-    } catch (StorageException | IOException e) {
-      LOG.error("Processor list: " + processors + "publish failed", e);
-      return false;
-    }
-  }
-
-  /**
-   * Reads the list of live processors published on the blob.
-   * @return String list of live processors.
-   * @throws AzureException If an Azure storage service error occurred.
-   * @throws SamzaException If data retrieved from blob could not be parsed by SamzaObjectMapper.
-   */
-  public List<String> getLiveProcessorList() {
-    LOG.info("Read the the list of live processors from blob.");
-    byte[] data = new byte[(int) PROCESSOR_LIST_BLOCK_SIZE];
-    try {
-      blob.downloadRangeToByteArray(JOB_MODEL_BLOCK_SIZE + BARRIER_STATE_BLOCK_SIZE, PROCESSOR_LIST_BLOCK_SIZE, data, 0);
-    } catch (StorageException e) {
-      LOG.error("Failed to read the list of live processors from the blob.", new AzureException(e));
-      throw new AzureException(e);
-    }
-    List<String> list;
-    try {
-      list = SamzaObjectMapper.getObjectMapper().readValue(data, List.class);
-    } catch (IOException e) {
-      LOG.error("Failed to parse byte data: " + data + " for live processor list retrieved from the blob", new SamzaException(e));
-      throw new SamzaException(e);
-    }
-    return list;
-  }
-
-  public CloudBlobClient getBlobClient() {
-    return this.blobClient;
-  }
-
-  public CloudBlobContainer getBlobContainer() {
-    return this.container;
-  }
-
-  public CloudPageBlob getBlob() {
-    return this.blob;
-  }
-
-  private JobModelBundle getJobModelBundle() {
-    byte[] data = new byte[(int) JOB_MODEL_BLOCK_SIZE];
-    try {
-      blob.downloadRangeToByteArray(0, JOB_MODEL_BLOCK_SIZE, data, 0);
-    } catch (StorageException e) {
-      LOG.error("Failed to read JobModel details from the blob.", e);
-      throw new AzureException(e);
-    }
-    try {
-      JobModelBundle jmBundle = SamzaObjectMapper.getObjectMapper().readValue(data, JobModelBundle.class);
-      return jmBundle;
-    } catch (IOException e) {
-      LOG.error("Failed to parse byte data: " + data + " for JobModel details retrieved from the blob", e);
-      throw new SamzaException(e);
-    }
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/JobModelBundle.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/JobModelBundle.java b/samza-azure/src/main/java/org/apache/samza/JobModelBundle.java
deleted file mode 100644
index 3ff971f..0000000
--- a/samza-azure/src/main/java/org/apache/samza/JobModelBundle.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza;
-
-import org.apache.samza.job.model.JobModel;
-
-
-/**
- * Bundle class for current and previous - job model and job model version.
- * Used for publishing updated data to the blob in one go.
- */
-public class JobModelBundle {
-
-  private JobModel prevJobModel;
-  private JobModel currJobModel;
-  private String prevJobModelVersion;
-  private String currJobModelVersion;
-
-  public JobModelBundle() {}
-
-  public JobModelBundle(JobModel prevJM, JobModel currJM, String prevJMV, String currJMV) {
-    prevJobModel = prevJM;
-    currJobModel = currJM;
-    prevJobModelVersion = prevJMV;
-    currJobModelVersion = currJMV;
-  }
-
-  public JobModel getCurrJobModel() {
-    return currJobModel;
-  }
-
-  public JobModel getPrevJobModel() {
-    return prevJobModel;
-  }
-
-  public String getCurrJobModelVersion() {
-    return currJobModelVersion;
-  }
-
-  public String getPrevJobModelVersion() {
-    return prevJobModelVersion;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/LeaseBlobManager.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/LeaseBlobManager.java b/samza-azure/src/main/java/org/apache/samza/LeaseBlobManager.java
deleted file mode 100644
index 5375662..0000000
--- a/samza-azure/src/main/java/org/apache/samza/LeaseBlobManager.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza;
-
-import com.microsoft.azure.storage.AccessCondition;
-import com.microsoft.azure.storage.StorageException;
-import com.microsoft.azure.storage.blob.CloudPageBlob;
-import org.eclipse.jetty.http.HttpStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Helper class for lease blob operations.
- */
-public class LeaseBlobManager {
-
-  private static final Logger LOG = LoggerFactory.getLogger(LeaseBlobManager.class);
-  private CloudPageBlob leaseBlob;
-
-  public LeaseBlobManager(CloudPageBlob leaseBlob) {
-    this.leaseBlob = leaseBlob;
-  }
-
-  /**
-   * Acquires a lease on a blob. The lease ID is NULL initially.
-   * @param leaseTimeInSec The time in seconds you want to acquire the lease for.
-   * @param leaseId Proposed ID you want to acquire the lease with, null if not proposed.
-   * @return String that represents lease ID.  Null if acquireLease is unsuccessful because the blob is leased already.
-   * @throws AzureException If a Azure storage service error occurred. This includes the case where the blob you're trying to lease does not exist.
-   */
-  public String acquireLease(int leaseTimeInSec, String leaseId) {
-    try {
-      String id = leaseBlob.acquireLease(leaseTimeInSec, leaseId);
-      LOG.info("Acquired lease with lease id = " + id);
-      return id;
-    } catch (StorageException storageException) {
-      int httpStatusCode = storageException.getHttpStatusCode();
-      if (httpStatusCode == HttpStatus.CONFLICT_409) {
-        LOG.info("The blob you're trying to acquire is leased already.", storageException);
-      } else if (httpStatusCode == HttpStatus.NOT_FOUND_404) {
-        LOG.error("The blob you're trying to lease does not exist.", storageException);
-        throw new AzureException(storageException);
-      } else {
-        LOG.error("Error acquiring lease!", storageException);
-        throw new AzureException(storageException);
-      }
-    }
-    return null;
-  }
-
-  /**
-   * Renews the lease on the blob.
-   * @param leaseId ID of the lease to be renewed.
-   * @return True if lease was renewed successfully, false otherwise.
-   */
-  public boolean renewLease(String leaseId) {
-    try {
-      leaseBlob.renewLease(AccessCondition.generateLeaseCondition(leaseId));
-      return true;
-    } catch (StorageException storageException) {
-      LOG.error("Wasn't able to renew lease with lease id: " + leaseId, storageException);
-      return false;
-    }
-  }
-
-  /**
-   * Releases the lease on the blob.
-   * @param leaseId ID of the lease to be released.
-   * @return True if released successfully, false otherwise.
-   */
-  public boolean releaseLease(String leaseId) {
-    try {
-      leaseBlob.releaseLease(AccessCondition.generateLeaseCondition(leaseId));
-      return true;
-    } catch (StorageException storageException) {
-      LOG.error("Wasn't able to release lease with lease id: " + leaseId, storageException);
-      return false;
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/ProcessorEntity.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/ProcessorEntity.java b/samza-azure/src/main/java/org/apache/samza/ProcessorEntity.java
deleted file mode 100644
index 5145821..0000000
--- a/samza-azure/src/main/java/org/apache/samza/ProcessorEntity.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza;
-
-import com.microsoft.azure.storage.table.TableServiceEntity;
-
-
-/**
- * Table schema for Azure processor table.
- * Denotes a row in the table with PARTITION KEY = Job Model Version and ROW KEY = Processor ID
- * Other fields include integer liveness value to which each processor heartbeats,
- * and boolean isLeader value which denotes whether the processor is a leader or not.
- */
-public class ProcessorEntity extends TableServiceEntity {
-  private int liveness;
-  private boolean isLeader;
-
-  public ProcessorEntity() {}
-
-  public ProcessorEntity(String jobModelVersion, String processorId) {
-    this.partitionKey = jobModelVersion;
-    this.rowKey = processorId;
-    this.isLeader = false;
-  }
-
-  /**
-   * Updates heartbeat by updating the liveness value in the table.
-   * @param value
-   */
-  public void setLiveness(int value) {
-    liveness = value;
-  }
-
-  public void setIsLeader(boolean leader) {
-    isLeader = leader;
-  }
-
-  public boolean getIsLeader() {
-    return isLeader;
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/TableUtils.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/TableUtils.java b/samza-azure/src/main/java/org/apache/samza/TableUtils.java
deleted file mode 100644
index e49fd90..0000000
--- a/samza-azure/src/main/java/org/apache/samza/TableUtils.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza;
-
-import com.microsoft.azure.storage.StorageException;
-import com.microsoft.azure.storage.table.CloudTable;
-import com.microsoft.azure.storage.table.CloudTableClient;
-import com.microsoft.azure.storage.table.TableOperation;
-import com.microsoft.azure.storage.table.TableQuery;
-import java.net.URISyntaxException;
-import java.util.HashSet;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- *  Client side class that has a reference to Azure Table Storage.
- *  Enables the user to add or delete information from the table, make updates to the table and retrieve information from the table.
- *  Every row in a table is uniquely identified by a combination of the PARTIITON KEY and ROW KEY.
- *  PARTITION KEY = Group ID = Job Model Version (for this case).
- *  ROW KEY = Unique entity ID for a group = Processor ID (for this case).
- */
-public class TableUtils {
-
-  private static final Logger LOG = LoggerFactory.getLogger(TableUtils.class);
-  private static final String PARTITION_KEY = "PartitionKey";
-  private static final long CHECK_LIVENESS_DELAY = 30;
-  private static final String INITIAL_STATE = "unassigned";
-  private CloudTableClient tableClient;
-  private CloudTable table;
-
-  public TableUtils(AzureClient client, String tableName) {
-    tableClient = client.getTableClient();
-    try {
-      table = tableClient.getTableReference(tableName);
-      table.createIfNotExists();
-    } catch (URISyntaxException e) {
-      LOG.error("\nConnection string specifies an invalid URI.", new SamzaException(e));
-    } catch (StorageException e) {
-      LOG.error("Azure storage exception.", new SamzaException(e));
-    }
-  }
-
-  /**
-   * Add a row which denotes an active processor to the processor table.
-   * @param jmVersion Job model version that the processor is operating on.
-   * @param pid Unique processor ID.
-   * @param liveness Random heartbeat value.
-   * @param isLeader Denotes whether the processor is a leader or not.
-   * @throws AzureException If an Azure storage service error occurred.
-   */
-  public void addProcessorEntity(String jmVersion, String pid, int liveness, boolean isLeader) {
-    ProcessorEntity entity = new ProcessorEntity(jmVersion, pid);
-    entity.setIsLeader(isLeader);
-    entity.setLiveness(liveness);
-    TableOperation add = TableOperation.insert(entity);
-    try {
-      table.execute(add);
-    } catch (StorageException e) {
-      LOG.error("Azure storage exception while adding processor entity with job model version: " + jmVersion + "and pid: " + pid, e);
-      throw new AzureException(e);
-    }
-  }
-
-  /**
-   * Retrieve a particular row in the processor table, given the partition key and the row key.
-   * @param jmVersion Job model version of the processor row to be retrieved.
-   * @param pid Unique processor ID of the processor row to be retrieved.
-   * @return An instance of required processor entity. Null if does not exist.
-   * @throws AzureException If an Azure storage service error occurred.
-   */
-  public ProcessorEntity getEntity(String jmVersion, String pid) {
-    try {
-      TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class);
-      ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType();
-      return entity;
-    } catch (StorageException e) {
-      LOG.error("Azure storage exception while retrieving processor entity with job model version: " + jmVersion + "and pid: " + pid, e);
-      throw new AzureException(e);
-    }
-  }
-
-  /**
-   * Updates the liveness value of a particular processor with a randomly generated integer, which in turn updates the last modified since timestamp of the row.
-   * @param jmVersion Job model version of the processor row to be updated.
-   * @param pid Unique processor ID of the processor row to be updated.
-   */
-  public void updateHeartbeat(String jmVersion, String pid) {
-    try {
-      Random rand = new Random();
-      int value = rand.nextInt(10000) + 2;
-      TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class);
-      ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType();
-      entity.setLiveness(value);
-      TableOperation update = TableOperation.replace(entity);
-      table.execute(update);
-    } catch (StorageException e) {
-      LOG.error("Azure storage exception while updating heartbeat for job model version: " + jmVersion + "and pid: " + pid, e);
-    }
-  }
-
-  /**
-   * Updates the isLeader value when the processor starts or stops being a leader.
-   * @param jmVersion Job model version of the processor row to be updated.
-   * @param pid Unique processor ID of the processor row to be updated.
-   * @param isLeader Denotes whether the processor is a leader or not.
-   * @throws AzureException If an Azure storage service error occurred.
-   */
-  public void updateIsLeader(String jmVersion, String pid, boolean isLeader) {
-    try {
-      TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class);
-      ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType();
-      entity.setIsLeader(isLeader);
-      TableOperation update = TableOperation.replace(entity);
-      table.execute(update);
-    } catch (StorageException e) {
-      LOG.error("Azure storage exception while updating isLeader value for job model version: " + jmVersion + "and pid: " + pid, e);
-      throw new AzureException(e);
-    }
-  }
-
-  /**
-   * Deletes a specified row in the processor table.
-   * @param jmVersion Job model version of the processor row to be deleted.
-   * @param pid Unique processor ID of the processor row to be deleted.
-   * @throws AzureException If an Azure storage service error occurred.
-   */
-  public void deleteProcessorEntity(String jmVersion, String pid) {
-    try {
-      TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class);
-      ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType();
-      TableOperation remove = TableOperation.delete(entity);
-      table.execute(remove);
-    } catch (StorageException e) {
-      LOG.error("Azure storage exception while deleting processor entity with job model version: " + jmVersion + "and pid: " + pid, e);
-      throw new AzureException(e);
-    }
-  }
-
-  /**
-   * Retrieve all rows in a table with the given partition key.
-   * @param partitionKey Job model version of the processors to be retrieved.
-   * @return Iterable list of processor entities.
-   */
-  public Iterable<ProcessorEntity> getEntitiesWithPartition(String partitionKey) {
-    String partitionFilter = TableQuery.generateFilterCondition(PARTITION_KEY, TableQuery.QueryComparisons.EQUAL, partitionKey);
-    TableQuery<ProcessorEntity> partitionQuery = TableQuery.from(ProcessorEntity.class).where(partitionFilter);
-    return table.execute(partitionQuery);
-  }
-
-  /**
-   * Gets the list of all active processors that are heartbeating to the processor table.
-   * @param currentJMVersion Current job model version that the processors in the application are operating on.
-   * @return List of ids of currently active processors in the application, retrieved from the processor table.
-   */
-  public Set<String> getActiveProcessorsList(AtomicReference<String> currentJMVersion) {
-    Iterable<ProcessorEntity> tableList = getEntitiesWithPartition(currentJMVersion.get());
-    Set<String> activeProcessorsList = new HashSet<>();
-    for (ProcessorEntity entity: tableList) {
-      if (System.currentTimeMillis() - entity.getTimestamp().getTime() <= CHECK_LIVENESS_DELAY * 1000) {
-        activeProcessorsList.add(entity.getRowKey());
-      }
-    }
-
-    Iterable<ProcessorEntity> unassignedList = getEntitiesWithPartition(INITIAL_STATE);
-    for (ProcessorEntity entity: unassignedList) {
-      if (System.currentTimeMillis() - entity.getTimestamp().getTime() <= CHECK_LIVENESS_DELAY * 1000) {
-        activeProcessorsList.add(entity.getRowKey());
-      }
-    }
-    return activeProcessorsList;
-  }
-
-  public CloudTable getTable() {
-    return table;
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/config/AzureConfig.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/config/AzureConfig.java b/samza-azure/src/main/java/org/apache/samza/config/AzureConfig.java
new file mode 100644
index 0000000..dc96d2d
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/config/AzureConfig.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config;
+
+/**
+ * Config class for reading all user defined parameters for Azure driven coordination services.
+ */
+public class AzureConfig extends MapConfig {
+
+  // Connection string for Azure Storage Account, format: "DefaultEndpointsProtocol=<https>;AccountName=<>;AccountKey=<>"
+  public static final String AZURE_STORAGE_CONNECT = "azure.storage.connect";
+  public static final String AZURE_PAGEBLOB_LENGTH = "job.coordinator.azure.blob.length";
+  public static final long DEFAULT_AZURE_PAGEBLOB_LENGTH = 5120000;
+
+  private static String containerName;
+  private static String blobName;
+  private static String tableName;
+
+  public AzureConfig(Config config) {
+    super(config);
+    ApplicationConfig appConfig = new ApplicationConfig(config);
+    //Remove all non-alphanumeric characters from id as table name does not allow them.
+    String id = appConfig.getGlobalAppId().replaceAll("[^A-Za-z0-9]", "");
+    containerName = "samzacontainer" + id;
+    blobName = "samzablob" + id;
+    tableName = "samzatable" + id;
+  }
+
+  public String getAzureConnect() {
+    if (!containsKey(AZURE_STORAGE_CONNECT)) {
+      throw new ConfigException("Missing " + AZURE_STORAGE_CONNECT + " config!");
+    }
+    return get(AZURE_STORAGE_CONNECT);
+  }
+
+  public String getAzureContainerName() {
+    return containerName;
+  }
+
+  public String getAzureBlobName() {
+    return blobName;
+  }
+
+  public long getAzureBlobLength() {
+    return getLong(AZURE_PAGEBLOB_LENGTH, DEFAULT_AZURE_PAGEBLOB_LENGTH);
+  }
+
+  public String getAzureTableName() {
+    return tableName;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
new file mode 100644
index 0000000..9438690
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.AzureClient;
+import org.apache.samza.config.AzureConfig;
+import org.apache.samza.coordinator.data.BarrierState;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper;
+import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.runtime.ProcessorIdGenerator;
+import org.apache.samza.coordinator.scheduler.HeartbeatScheduler;
+import org.apache.samza.coordinator.scheduler.JMVersionUpgradeScheduler;
+import org.apache.samza.coordinator.scheduler.LeaderBarrierCompleteScheduler;
+import org.apache.samza.coordinator.scheduler.LeaderLivenessCheckScheduler;
+import org.apache.samza.coordinator.scheduler.LivenessCheckScheduler;
+import org.apache.samza.coordinator.scheduler.RenewLeaseScheduler;
+import org.apache.samza.coordinator.scheduler.SchedulerStateChangeListener;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.BlobUtils;
+import org.apache.samza.util.ClassLoaderHelper;
+import org.apache.samza.util.LeaseBlobManager;
+import org.apache.samza.util.TableUtils;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * Class that provides coordination mechanism for Samza standalone in Azure.
+ * Handles processor lifecycle through Azure blob and table storage. Orchestrates leader election.
+ * The leader job coordinator generates partition mapping, writes shared data to the blob and manages rebalancing.
+ */
+public class AzureJobCoordinator implements JobCoordinator {
+  private static final Logger LOG = LoggerFactory.getLogger(AzureJobCoordinator.class);
+  private static final int METADATA_CACHE_TTL_MS = 5000;
+  private static final String INITIAL_STATE = "UNASSIGNED";
+  private final Consumer<String> errorHandler;
+  private final AzureLeaderElector azureLeaderElector;
+  private final BlobUtils leaderBlob;
+  private final TableUtils table;
+  private final Config config;
+  private final String processorId;
+  private final AzureClient client;
+  private final AtomicReference<String> currentJMVersion;
+  private final AtomicBoolean versionUpgradeDetected;
+  private final HeartbeatScheduler heartbeat;
+  private final JMVersionUpgradeScheduler versionUpgrade;
+  private final LeaderLivenessCheckScheduler leaderAlive;
+  private LivenessCheckScheduler liveness;
+  private RenewLeaseScheduler renewLease;
+  private LeaderBarrierCompleteScheduler leaderBarrierScheduler;
+  private StreamMetadataCache streamMetadataCache = null;
+  private JobCoordinatorListener coordinatorListener = null;
+  private JobModel jobModel = null;
+
+  /**
+   * Creates an instance of Azure job coordinator, along with references to Azure leader elector, Azure Blob and Azure Table.
+   * @param config User defined config
+   */
+  public AzureJobCoordinator(Config config) {
+    //TODO: Cleanup previous values in the table when barrier times out.
+    this.config = config;
+    processorId = createProcessorId(config);
+    currentJMVersion = new AtomicReference<>(INITIAL_STATE);
+    AzureConfig azureConfig = new AzureConfig(config);
+    client = new AzureClient(azureConfig.getAzureConnect());
+    leaderBlob = new BlobUtils(client, azureConfig.getAzureContainerName(), azureConfig.getAzureBlobName(), azureConfig.getAzureBlobLength());
+    errorHandler = (errorMsg) -> {
+      LOG.error(errorMsg);
+      stop();
+    };
+    table = new TableUtils(client, azureConfig.getAzureTableName(), INITIAL_STATE);
+    azureLeaderElector = new AzureLeaderElector(new LeaseBlobManager(leaderBlob.getBlob()));
+    azureLeaderElector.setLeaderElectorListener(new AzureLeaderElectorListener());
+    versionUpgradeDetected = new AtomicBoolean(false);
+    heartbeat = new HeartbeatScheduler(errorHandler, table, currentJMVersion, processorId);
+    versionUpgrade = new JMVersionUpgradeScheduler(errorHandler, leaderBlob, currentJMVersion, versionUpgradeDetected, processorId);
+    leaderAlive = new LeaderLivenessCheckScheduler(errorHandler, table, leaderBlob, currentJMVersion, INITIAL_STATE);
+    leaderBarrierScheduler = null;
+    renewLease = null;
+    liveness = null;
+  }
+
+  @Override
+  public void start() {
+
+    LOG.info("Starting Azure job coordinator.");
+    streamMetadataCache = StreamMetadataCache.apply(METADATA_CACHE_TTL_MS, config);
+    table.addProcessorEntity(INITIAL_STATE, processorId, false);
+
+    // Start scheduler for heartbeating
+    LOG.info("Starting scheduler for heartbeating.");
+    heartbeat.scheduleTask();
+
+    azureLeaderElector.tryBecomeLeader();
+
+    // Start scheduler to check for job model version upgrades
+    LOG.info("Starting scheduler to check for job model version upgrades.");
+    versionUpgrade.setStateChangeListener(createJMVersionUpgradeListener());
+    versionUpgrade.scheduleTask();
+
+    // Start scheduler to check for leader liveness
+    LOG.info("Starting scheduler to check for leader liveness.");
+    leaderAlive.setStateChangeListener(createLeaderLivenessListener());
+    leaderAlive.scheduleTask();
+  }
+
+  @Override
+  public void stop() {
+    LOG.info("Shutting down Azure job coordinator.");
+
+    if (coordinatorListener != null) {
+      coordinatorListener.onJobModelExpired();
+    }
+
+    // Resign leadership
+    if (azureLeaderElector.amILeader()) {
+      azureLeaderElector.resignLeadership();
+    }
+
+    // Shutdown all schedulers
+    shutdownSchedulers();
+
+    if (coordinatorListener != null) {
+      coordinatorListener.onCoordinatorStop();
+    }
+  }
+
+  @Override
+  public String getProcessorId() {
+    return processorId;
+  }
+
+  @Override
+  public void setListener(JobCoordinatorListener listener) {
+    this.coordinatorListener = listener;
+  }
+
+  @Override
+  public JobModel getJobModel() {
+    return jobModel;
+  }
+
+  private void shutdownSchedulers() {
+    if (renewLease != null) {
+      renewLease.shutdown();
+    }
+    if (leaderBarrierScheduler != null) {
+      leaderBarrierScheduler.shutdown();
+    }
+    if (liveness != null) {
+      liveness.shutdown();
+    }
+    heartbeat.shutdown();
+    leaderAlive.shutdown();
+    versionUpgrade.shutdown();
+  }
+
+  /**
+   * Creates a listener for LeaderBarrierCompleteScheduler class.
+   * Invoked by the leader when it detects that rebalancing has completed by polling the processor table.
+   * Updates the barrier state on the blob to denote that the barrier has completed.
+   * Cancels all future tasks scheduled by the LeaderBarrierComplete scheduler to check if barrier has completed.
+   * @return an instance of SchedulerStateChangeListener.
+   */
+  private SchedulerStateChangeListener createLeaderBarrierCompleteListener(String nextJMVersion, AtomicBoolean barrierTimeout) {
+    return () -> {
+      versionUpgradeDetected.getAndSet(false);
+      String state;
+      if (barrierTimeout.get()) {
+        LOG.error("Barrier timed out for version {}", nextJMVersion);
+        state = BarrierState.TIMEOUT.name() + " " + nextJMVersion;
+      } else {
+        LOG.info("Leader detected barrier completion.");
+        state = BarrierState.END.name() + " " + nextJMVersion;
+      }
+      if (!leaderBlob.publishBarrierState(state, azureLeaderElector.getLeaseId().get())) {
+        LOG.info("Leader failed to publish the job model {}. Stopping the processor with PID: .", jobModel, processorId);
+        stop();
+        table.deleteProcessorEntity(currentJMVersion.get(), processorId);
+      }
+      leaderBarrierScheduler.shutdown();
+    };
+  }
+
+  /**
+   * Creates a listener for LivenessCheckScheduler class.
+   * Invoked by the leader when the list of active processors in the system changes.
+   * @return an instance of SchedulerStateChangeListener.
+   */
+  private SchedulerStateChangeListener createLivenessListener(AtomicReference<List<String>> liveProcessors) {
+    return () -> {
+      LOG.info("Leader detected change in list of live processors.");
+      doOnProcessorChange(liveProcessors.get());
+    };
+  }
+
+  /**
+   * Creates a listener for JMVersionUpgradeScheduler class.
+   * Invoked when the processor detects a job model version upgrade on the blob.
+   * Stops listening for job model version upgrades until rebalancing achieved.
+   * @return an instance of SchedulerStateChangeListener.
+   */
+  private SchedulerStateChangeListener createJMVersionUpgradeListener() {
+    return () -> {
+      LOG.info("Job model version upgrade detected.");
+      versionUpgradeDetected.getAndSet(true);
+      onNewJobModelAvailable(leaderBlob.getJobModelVersion());
+    };
+  }
+
+  /**
+   * Creates a listener for LeaderLivenessCheckScheduler class.
+   * Invoked when an existing leader dies. Enables the JC to participate in leader election again.
+   * @return an instance of SchedulerStateChangeListener.
+   */
+  private SchedulerStateChangeListener createLeaderLivenessListener() {
+    return () -> {
+      LOG.info("Existing leader died.");
+      azureLeaderElector.tryBecomeLeader();
+    };
+  }
+
+  /**
+   * For each input stream specified in config, exactly determine its
+   * partitions, returning a set of SystemStreamPartitions containing them all.
+   */
+  private Set<SystemStreamPartition> getInputStreamPartitions() {
+    TaskConfig taskConfig = new TaskConfig(config);
+    scala.collection.immutable.Set<SystemStream> inputSystemStreams = taskConfig.getInputStreams();
+
+    // Get the set of partitions for each SystemStream from the stream metadata
+    Set<SystemStreamPartition>
+        sspSet = JavaConverters.mapAsJavaMapConverter(streamMetadataCache.getStreamMetadata(inputSystemStreams, true)).asJava()
+        .entrySet()
+        .stream()
+        .flatMap(this::mapSSMToSSP)
+        .collect(Collectors.toSet());
+
+    return sspSet;
+  }
+
+  private Stream<SystemStreamPartition> mapSSMToSSP(Map.Entry<SystemStream, SystemStreamMetadata> ssMs) {
+    return ssMs.getValue()
+        .getSystemStreamPartitionMetadata()
+        .keySet()
+        .stream()
+        .map(partition -> new SystemStreamPartition(ssMs.getKey(), partition));
+  }
+
+  /**
+   * Gets a SystemStreamPartitionGrouper object from the configuration.
+   */
+  private SystemStreamPartitionGrouper getSystemStreamPartitionGrouper() {
+    JobConfig jobConfig = new JobConfig(config);
+    String factoryString = jobConfig.getSystemStreamPartitionGrouperFactory();
+    SystemStreamPartitionGrouper grouper = Util.<SystemStreamPartitionGrouperFactory>getObj(factoryString).getSystemStreamPartitionGrouper(jobConfig);
+    return grouper;
+  }
+
+  private int getMaxNumTasks() {
+    // Do grouping to fetch TaskName to SSP mapping
+    Set<SystemStreamPartition> allSystemStreamPartitions = getInputStreamPartitions();
+    SystemStreamPartitionGrouper grouper = getSystemStreamPartitionGrouper();
+    Map<TaskName, Set<SystemStreamPartition>> groups = grouper.group(allSystemStreamPartitions);
+    LOG.info("SystemStreamPartitionGrouper " + grouper.toString() + " has grouped the SystemStreamPartitions into " + Integer.toString(groups.size()) +
+        " tasks with the following taskNames: {}", groups.keySet());
+    return groups.size();
+  }
+
+  /**
+   * Called only by the leader, either when the processor becomes the leader, or when the list of live processors changes.
+   * @param currentProcessorIds New updated list of processor IDs which caused the rebalancing.
+   */
+  private void doOnProcessorChange(List<String> currentProcessorIds) {
+    // if list of processors is empty - it means we are called from 'onBecomeLeader'
+
+    // Check if number of processors is greater than number of tasks
+    List<String> initialProcessorIds = new ArrayList<>(currentProcessorIds);
+    int numTasks = getMaxNumTasks();
+    if (currentProcessorIds.size() > numTasks) {
+      int iterator = 0;
+      while (currentProcessorIds.size() != numTasks) {
+        if (!currentProcessorIds.get(iterator).equals(processorId)) {
+          currentProcessorIds.remove(iterator);
+          iterator++;
+        }
+      }
+    }
+    LOG.info("currentProcessorIds = {}", currentProcessorIds);
+    LOG.info("initialProcessorIds = {}", initialProcessorIds);
+
+    String nextJMVersion;
+    String prevJMVersion = currentJMVersion.get();
+    JobModel prevJobModel = jobModel;
+    AtomicBoolean barrierTimeout = new AtomicBoolean(false);
+
+    if (currentProcessorIds.isEmpty()) {
+      if (currentJMVersion.get().equals(INITIAL_STATE)) {
+        nextJMVersion = "1";
+      } else {
+        nextJMVersion = Integer.toString(Integer.valueOf(prevJMVersion) + 1);
+      }
+      currentProcessorIds = new ArrayList<>(table.getActiveProcessorsList(currentJMVersion));
+      initialProcessorIds = currentProcessorIds;
+    } else {
+      //Check if previous barrier not reached, then previous barrier times out.
+      String blobJMV = leaderBlob.getJobModelVersion();
+      nextJMVersion = Integer.toString(Integer.valueOf(prevJMVersion) + 1);
+      if (blobJMV != null && Integer.valueOf(blobJMV) > Integer.valueOf(prevJMVersion)) {
+        prevJMVersion = blobJMV;
+        prevJobModel = leaderBlob.getJobModel();
+        nextJMVersion = Integer.toString(Integer.valueOf(blobJMV) + 1);
+        versionUpgradeDetected.getAndSet(false);
+        leaderBarrierScheduler.shutdown();
+        leaderBlob.publishBarrierState(BarrierState.TIMEOUT.name() + " " + blobJMV, azureLeaderElector.getLeaseId().get());
+      }
+    }
+
+    // Generate the new JobModel
+    JobModel newJobModel = JobModelManager.readJobModel(this.config, Collections.emptyMap(),
+        null, streamMetadataCache, currentProcessorIds);
+    LOG.info("pid=" + processorId + "Generated new Job Model. Version = " + nextJMVersion);
+
+    // Publish the new job model
+    boolean jmWrite = leaderBlob.publishJobModel(prevJobModel, newJobModel, prevJMVersion, nextJMVersion, azureLeaderElector.getLeaseId().get());
+    // Publish barrier state
+    boolean barrierWrite = leaderBlob.publishBarrierState(BarrierState.START.name() + " " + nextJMVersion, azureLeaderElector.getLeaseId().get());
+    barrierTimeout.set(false);
+    // Publish list of processors this function was called with
+    boolean processorWrite = leaderBlob.publishLiveProcessorList(initialProcessorIds, azureLeaderElector.getLeaseId().get());
+
+    //Shut down processor if write fails even after retries. These writes have an inherent retry policy.
+    if (!jmWrite || !barrierWrite || !processorWrite) {
+      LOG.info("Leader failed to publish the job model {}. Stopping the processor with PID: .", jobModel, processorId);
+      stop();
+      table.deleteProcessorEntity(currentJMVersion.get(), processorId);
+    }
+
+    LOG.info("pid=" + processorId + "Published new Job Model. Version = " + nextJMVersion);
+
+    // Start scheduler to check if barrier reached
+    long startTime = System.currentTimeMillis();
+    leaderBarrierScheduler = new LeaderBarrierCompleteScheduler(errorHandler, table, nextJMVersion, initialProcessorIds, startTime, barrierTimeout, currentJMVersion, processorId);
+    leaderBarrierScheduler.setStateChangeListener(createLeaderBarrierCompleteListener(nextJMVersion, barrierTimeout));
+    leaderBarrierScheduler.scheduleTask();
+  }
+
+  /**
+   * Called when the JC detects a job model version upgrade on the shared blob.
+   * @param nextJMVersion The new job model version after rebalancing.
+   */
+  private void onNewJobModelAvailable(final String nextJMVersion) {
+    LOG.info("pid=" + processorId + "new JobModel available with job model version {}", nextJMVersion);
+
+    //Get the new job model from blob
+    jobModel = leaderBlob.getJobModel();
+    LOG.info("pid=" + processorId + ": new JobModel available. ver=" + nextJMVersion + "; jm = " + jobModel);
+
+    if (!jobModel.getContainers().containsKey(processorId)) {
+      LOG.info("JobModel: {} does not contain the processorId: {}. Stopping the processor.", jobModel, processorId);
+      stop();
+      table.deleteProcessorEntity(currentJMVersion.get(), processorId);
+    } else {
+      //Stop current work
+      if (coordinatorListener != null) {
+        coordinatorListener.onJobModelExpired();
+      }
+      // Add entry with new job model version to the processor table
+      table.addProcessorEntity(nextJMVersion, processorId, azureLeaderElector.amILeader());
+
+      // Start polling blob to check if barrier reached
+      Random random = new Random();
+      String blobBarrierState = leaderBlob.getBarrierState();
+      while (true) {
+        if (blobBarrierState.equals(BarrierState.END.name() + " " + nextJMVersion)) {
+          LOG.info("Barrier completion detected by the worker for barrier version {}.", nextJMVersion);
+          versionUpgradeDetected.getAndSet(false);
+          onNewJobModelConfirmed(nextJMVersion);
+          break;
+        } else if (blobBarrierState.equals(BarrierState.TIMEOUT.name() + " " + nextJMVersion) ||
+            (Integer.valueOf(leaderBlob.getJobModelVersion()) > Integer.valueOf(nextJMVersion))) {
+          LOG.info("Barrier timed out for version number {}", nextJMVersion);
+          versionUpgradeDetected.getAndSet(false);
+          break;
+        } else {
+          try {
+            Thread.sleep(random.nextInt(5000));
+          } catch (InterruptedException e) {
+            Thread.interrupted();
+          }
+          LOG.info("Checking for barrier state on the blob again...");
+          blobBarrierState = leaderBlob.getBarrierState();
+        }
+      }
+    }
+  }
+
+  /**
+   * Called when the JC detects that the barrier has completed by checking the barrier state on the blob.
+   * @param nextJMVersion The new job model version after rebalancing.
+   */
+  private void onNewJobModelConfirmed(final String nextJMVersion) {
+    LOG.info("pid=" + processorId + "new version " + nextJMVersion + " of the job model got confirmed");
+
+    // Delete previous value
+    if (table.getEntity(currentJMVersion.get(), processorId) != null) {
+      table.deleteProcessorEntity(currentJMVersion.get(), processorId);
+    }
+    if (table.getEntity(INITIAL_STATE, processorId) != null) {
+      table.deleteProcessorEntity(INITIAL_STATE, processorId);
+    }
+
+    //Start heartbeating to new entry only when barrier reached.
+    //Changing the current job model version enables that since we are heartbeating to a row identified by the current job model version.
+    currentJMVersion.getAndSet(nextJMVersion);
+
+    //Start the container with the new model
+    if (coordinatorListener != null) {
+      coordinatorListener.onNewJobModel(processorId, jobModel);
+    }
+  }
+
+  private String createProcessorId(Config config) {
+    // TODO: This check to be removed after 0.13+
+    ApplicationConfig appConfig = new ApplicationConfig(config);
+    if (appConfig.getProcessorId() != null) {
+      return appConfig.getProcessorId();
+    } else if (StringUtils.isNotBlank(appConfig.getAppProcessorIdGeneratorClass())) {
+      ProcessorIdGenerator idGenerator =
+          ClassLoaderHelper.fromClassName(appConfig.getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class);
+      return idGenerator.generateProcessorId(config);
+    } else {
+      throw new ConfigException(String
+          .format("Expected either %s or %s to be configured", ApplicationConfig.PROCESSOR_ID,
+              ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS));
+    }
+  }
+
+  public class AzureLeaderElectorListener implements LeaderElectorListener {
+    /**
+     * Keep renewing the lease and do the required tasks as a leader.
+     */
+    @Override
+    public void onBecomingLeader() {
+      // Update table to denote that it is a leader.
+      table.updateIsLeader(currentJMVersion.get(), processorId, true);
+
+      // Schedule a task to renew the lease after a fixed time interval
+      LOG.info("Starting scheduler to keep renewing lease held by the leader.");
+      renewLease = new RenewLeaseScheduler((errorMsg) -> {
+          LOG.error(errorMsg);
+          table.updateIsLeader(currentJMVersion.get(), processorId, false);
+          azureLeaderElector.resignLeadership();
+          renewLease.shutdown();
+          liveness.shutdown();
+        }, azureLeaderElector.getLeaseBlobManager(), azureLeaderElector.getLeaseId());
+      renewLease.scheduleTask();
+
+      doOnProcessorChange(new ArrayList<>());
+
+      // Start scheduler to check for change in list of live processors
+      LOG.info("Starting scheduler to check for change in list of live processors in the system.");
+      liveness = new LivenessCheckScheduler(errorHandler, table, leaderBlob, currentJMVersion, processorId);
+      liveness.setStateChangeListener(createLivenessListener(liveness.getLiveProcessors()));
+      liveness.scheduleTask();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLeaderElector.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLeaderElector.java
new file mode 100644
index 0000000..c93f1d0
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLeaderElector.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.samza.AzureException;
+import org.apache.samza.util.LeaseBlobManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class to facilitate leader election in Azure.
+ * The processor that acquires the lease on the blob becomes the leader.
+ * The lease ID is null initially. It is generated by Azure when the processor acquires the lease, and updated accordingly.
+ * Every processor requires a valid active lease ID in order to perform successful write and delete operations on the blob.
+ * Read operations from the blob are not dependent on the lease ID.
+ */
+public class AzureLeaderElector implements LeaderElector {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AzureLeaderElector.class);
+  private static final int LEASE_TIME_IN_SEC = 60;
+  private final LeaseBlobManager leaseBlobManager;
+  private LeaderElectorListener leaderElectorListener = null;
+  private final AtomicReference<String> leaseId;
+  private final AtomicBoolean isLeader;
+
+  public AzureLeaderElector(LeaseBlobManager leaseBlobManager) {
+    this.isLeader = new AtomicBoolean(false);
+    this.leaseBlobManager = leaseBlobManager;
+    this.leaseId = new AtomicReference<>(null);
+  }
+
+  @Override
+  public void setLeaderElectorListener(LeaderElectorListener listener) {
+    this.leaderElectorListener = listener;
+  }
+
+  /**
+   * Tries to become the leader by acquiring a lease on the blob.
+   * The acquireLease operation has a retry policy where upon failure, the operation is tried 3 times at 5 second intervals.
+   * Invokes the listener on becoming the leader.
+   * @throws AzureException If a Azure storage service error occurred. This includes the case where the blob you're trying to lease does not exist.
+   */
+  @Override
+  public void tryBecomeLeader() throws AzureException {
+    leaseId.getAndSet(leaseBlobManager.acquireLease(LEASE_TIME_IN_SEC, leaseId.get()));
+    if (leaseId.get() != null) {
+      LOG.info("Became leader with lease ID {}.", leaseId.get());
+      isLeader.set(true);
+      if (leaderElectorListener != null) {
+        leaderElectorListener.onBecomingLeader();
+      }
+    } else {
+      LOG.info("Unable to become the leader. Continuing as a worker.");
+    }
+  }
+
+  /**
+   * Releases the lease in order to resign leadership. It also stops all schedulers scheduled by the leader.
+   * The releaseLease operation has a retry policy where upon failure, the operation is tried 3 times at 5 second intervals.
+   */
+  @Override
+  public void resignLeadership() {
+    if (isLeader.get()) {
+      leaseBlobManager.releaseLease(leaseId.get());
+      isLeader.set(false);
+      LOG.info("Resigning leadership with lease ID {}", leaseId.get());
+      leaseId.getAndSet(null);
+    } else {
+      LOG.info("Can't release the lease because it is not the leader and does not hold an active lease.");
+    }
+  }
+
+  /**
+   * Checks whether it's a leader
+   * @return true if it is the leader, false otherwise
+   */
+  @Override
+  public boolean amILeader() {
+    return isLeader.get();
+  }
+
+  public AtomicReference<String> getLeaseId() {
+    return leaseId;
+  }
+
+  public LeaseBlobManager getLeaseBlobManager() {
+    return this.leaseBlobManager;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/data/BarrierState.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/data/BarrierState.java b/samza-azure/src/main/java/org/apache/samza/coordinator/data/BarrierState.java
new file mode 100644
index 0000000..1c144de
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/data/BarrierState.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.data;
+
+/**
+ * Enum depicting different barrier states.
+ */
+public enum BarrierState {
+  START, END, TIMEOUT
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/data/JobModelBundle.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/data/JobModelBundle.java b/samza-azure/src/main/java/org/apache/samza/coordinator/data/JobModelBundle.java
new file mode 100644
index 0000000..d05e0a5
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/data/JobModelBundle.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.data;
+
+import org.apache.samza.job.model.JobModel;
+
+
+/**
+ * Bundle class for current and previous - job model and job model version.
+ * Used for publishing updated data to the blob in one go.
+ */
+public class JobModelBundle {
+
+  private JobModel prevJobModel;
+  private JobModel currJobModel;
+  private String prevJobModelVersion;
+  private String currJobModelVersion;
+
+  public JobModelBundle() {}
+
+  public JobModelBundle(JobModel prevJM, JobModel currJM, String prevJMV, String currJMV) {
+    prevJobModel = prevJM;
+    currJobModel = currJM;
+    prevJobModelVersion = prevJMV;
+    currJobModelVersion = currJMV;
+  }
+
+  public JobModel getCurrJobModel() {
+    return currJobModel;
+  }
+
+  public JobModel getPrevJobModel() {
+    return prevJobModel;
+  }
+
+  public String getCurrJobModelVersion() {
+    return currJobModelVersion;
+  }
+
+  public String getPrevJobModelVersion() {
+    return prevJobModelVersion;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/data/ProcessorEntity.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/data/ProcessorEntity.java b/samza-azure/src/main/java/org/apache/samza/coordinator/data/ProcessorEntity.java
new file mode 100644
index 0000000..9323bde
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/data/ProcessorEntity.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.data;
+
+import com.microsoft.azure.storage.table.TableServiceEntity;
+import java.util.Random;
+
+
+/**
+ * Table schema for Azure processor table.
+ * Denotes a row in the table with PARTITION KEY = Job Model Version and ROW KEY = Processor ID
+ * Other fields include integer liveness value to which each processor heartbeats,
+ * and boolean isLeader value which denotes whether the processor is a leader or not.
+ */
+public class ProcessorEntity extends TableServiceEntity {
+  private Random rand = new Random();
+  private int liveness;
+  private boolean isLeader;
+
+  public ProcessorEntity() {}
+
+  public ProcessorEntity(String jobModelVersion, String processorId) {
+    this.partitionKey = jobModelVersion;
+    this.rowKey = processorId;
+    this.isLeader = false;
+    this.liveness = rand.nextInt(10000) + 2;
+  }
+
+  /**
+   * Updates heartbeat by updating the liveness value in the table.
+   * Sets the liveness field to a random integer value in order to update the last modified since timestamp of the row in the table.
+   * This asserts to the leader that the processor is alive.
+   */
+  public void updateLiveness() {
+    liveness = rand.nextInt(10000) + 2;
+  }
+
+  public void setIsLeader(boolean leader) {
+    isLeader = leader;
+  }
+
+  public boolean getIsLeader() {
+    return isLeader;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/HeartbeatScheduler.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/HeartbeatScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/HeartbeatScheduler.java
new file mode 100644
index 0000000..2abb380
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/HeartbeatScheduler.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.scheduler;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.samza.util.TableUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Scheduler invoked by each processor for heartbeating to a row of the table.
+ * Heartbeats every 5 seconds.
+ * The row is determined by the job model version and processor id passed to the scheduler.
+ * All time units are in SECONDS.
+ */
+public class HeartbeatScheduler implements TaskScheduler {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HeartbeatScheduler.class);
+  private static final long HEARTBEAT_DELAY_SEC = 5;
+  private static final ThreadFactory PROCESSOR_THREAD_FACTORY =
+      new ThreadFactoryBuilder().setNameFormat("HeartbeatScheduler-%d").build();
+  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(PROCESSOR_THREAD_FACTORY);
+  private final String processorId;
+  private final TableUtils table;
+  private final AtomicReference<String> currentJMVersion;
+  private final Consumer<String> errorHandler;
+
+  public HeartbeatScheduler(Consumer<String> errorHandler, TableUtils table, AtomicReference<String> currentJMVersion, final String pid) {
+    this.table = table;
+    this.currentJMVersion = currentJMVersion;
+    processorId = pid;
+    this.errorHandler = errorHandler;
+  }
+
+  @Override
+  public ScheduledFuture scheduleTask() {
+    return scheduler.scheduleWithFixedDelay(() -> {
+        try {
+          String currJVM = currentJMVersion.get();
+          LOG.info("Updating heartbeat for processor ID: " + processorId + " and job model version: " + currJVM);
+          table.updateHeartbeat(currJVM, processorId);
+        } catch (Exception e) {
+          errorHandler.accept("Exception in Heartbeat Scheduler. Stopping the processor...");
+        }
+      }, HEARTBEAT_DELAY_SEC, HEARTBEAT_DELAY_SEC, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public void setStateChangeListener(SchedulerStateChangeListener listener) {}
+
+  @Override
+  public void shutdown() {
+    LOG.info("Shutting down HeartbeatScheduler");
+    scheduler.shutdownNow();
+  }
+}
\ No newline at end of file