You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/08/18 21:38:55 UTC
[1/2] samza git commit: SAMZA-1378: Introduce and Implement Scheduler
Interface for Polling in Azure
Repository: samza
Updated Branches:
refs/heads/master 1d253c757 -> c3b447ecb
http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/JMVersionUpgradeScheduler.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/JMVersionUpgradeScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/JMVersionUpgradeScheduler.java
new file mode 100644
index 0000000..ded014f
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/JMVersionUpgradeScheduler.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.scheduler;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.samza.coordinator.data.BarrierState;
+import org.apache.samza.util.BlobUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Scheduler invoked by each processor to check for job model version upgrades on the blob.
+ * Checks every 5 seconds.
+ * The processor polls the leader blob in order to track this.
+ * All time units are in SECONDS.
+ */
+public class JMVersionUpgradeScheduler implements TaskScheduler {
+ private static final Logger LOG = LoggerFactory.getLogger(JMVersionUpgradeScheduler.class);
+ private static final long JMV_UPGRADE_DELAY_SEC = 5;
+ private static final ThreadFactory
+ PROCESSOR_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("JMVersionUpgradeScheduler-%d").build();
+ private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(PROCESSOR_THREAD_FACTORY);
+ private final BlobUtils blob;
+ private final AtomicReference<String> currentJMVersion;
+ private final AtomicBoolean versionUpgradeDetected;
+ private final String processorId;
+ private final Consumer<String> errorHandler;
+ private SchedulerStateChangeListener listener = null;
+
+ public JMVersionUpgradeScheduler(Consumer<String> errorHandler, BlobUtils blob,
+ AtomicReference<String> currentJMVersion, AtomicBoolean versionUpgradeDetected, String processorId) {
+ this.blob = blob;
+ this.currentJMVersion = currentJMVersion;
+ this.versionUpgradeDetected = versionUpgradeDetected;
+ this.processorId = processorId;
+ this.errorHandler = errorHandler;
+ }
+
+ @Override
+ public ScheduledFuture scheduleTask() {
+ return scheduler.scheduleWithFixedDelay(() -> {
+ try {
+ LOG.info("Checking for job model version upgrade");
+ // Read job model version from the blob.
+ String blobJMV = blob.getJobModelVersion();
+ LOG.info("Job Model Version seen on the blob: {}", blobJMV);
+ String blobBarrierState = blob.getBarrierState();
+ String currentJMV = currentJMVersion.get();
+ LOG.info("Current Job Model Version that the job coordinator is working on: {}", currentJMV);
+ String expectedBarrierState = BarrierState.START.toString() + " " + blobJMV;
+ List<String> processorList = blob.getLiveProcessorList();
+ // Check if the job model version on the blob is consistent with the job model version that the processor is operating on.
+ if (processorList != null && processorList.contains(processorId) && !currentJMV.equals(blobJMV) && blobBarrierState.equals(expectedBarrierState) && !versionUpgradeDetected.get()) {
+ listener.onStateChange();
+ }
+ } catch (Exception e) {
+ errorHandler.accept("Exception in Job Model Version Upgrade Scheduler. Stopping the processor...");
+ }
+ }, JMV_UPGRADE_DELAY_SEC, JMV_UPGRADE_DELAY_SEC, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void setStateChangeListener(SchedulerStateChangeListener listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public void shutdown() {
+ LOG.info("Shutting down JMVersionUpgradeScheduler Scheduler.");
+ scheduler.shutdownNow();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderBarrierCompleteScheduler.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderBarrierCompleteScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderBarrierCompleteScheduler.java
new file mode 100644
index 0000000..7386fa9
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderBarrierCompleteScheduler.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.scheduler;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.samza.coordinator.data.ProcessorEntity;
+import org.apache.samza.util.TableUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Scheduler class invoked by the leader to check if the barrier has completed.
+ * Checks every 15 seconds.
+ * The leader polls the Azure processor table in order to track this.
+ * The barrier is completed if all processors that are listed alive on the blob, have entries in the Azure table with the new job model version.
+ * All time units are in SECONDS.
+ */
+public class LeaderBarrierCompleteScheduler implements TaskScheduler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LeaderBarrierCompleteScheduler.class);
+ private static final long BARRIER_REACHED_DELAY_SEC = 5;
+ private static final long BARRIER_TIMEOUT_SEC = 30;
+ private static final ThreadFactory
+ PROCESSOR_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("LeaderBarrierCompleteScheduler-%d").build();
+ private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(PROCESSOR_THREAD_FACTORY);
+ private final TableUtils table;
+ private final String nextJMVersion;
+ private final Set<String> blobProcessorSet;
+ private final long startTime;
+ private final AtomicBoolean barrierTimeout;
+ private final Consumer<String> errorHandler;
+ private final String processorId;
+ private final AtomicReference<String> currentJMVersion;
+ private SchedulerStateChangeListener listener = null;
+
+ public LeaderBarrierCompleteScheduler(Consumer<String> errorHandler, TableUtils table, String nextJMVersion,
+ List<String> blobProcessorList, long startTime, AtomicBoolean barrierTimeout, AtomicReference<String> currentJMVersion, final String pid) {
+ this.table = table;
+ this.nextJMVersion = nextJMVersion;
+ this.blobProcessorSet = new HashSet<>(blobProcessorList);
+ this.startTime = startTime;
+ this.barrierTimeout = barrierTimeout;
+ this.errorHandler = errorHandler;
+ this.processorId = pid;
+ this.currentJMVersion = currentJMVersion;
+ }
+
+ @Override
+ public ScheduledFuture scheduleTask() {
+ return scheduler.scheduleWithFixedDelay(() -> {
+ try {
+ if (!table.getEntity(currentJMVersion.get(), processorId).getIsLeader()) {
+ LOG.info("Not the leader anymore. Shutting down LeaderBarrierCompleteScheduler.");
+ barrierTimeout.getAndSet(true);
+ listener.onStateChange();
+ } else {
+ LOG.info("Leader checking for barrier state");
+ // Get processor IDs listed in the table that have the new job model verion.
+ Iterable<ProcessorEntity> tableList = table.getEntitiesWithPartition(nextJMVersion);
+ Set<String> tableProcessors = new HashSet<>();
+ for (ProcessorEntity entity : tableList) {
+ tableProcessors.add(entity.getRowKey());
+ }
+ LOG.info("List of live processors as seen on the blob = {}", blobProcessorSet);
+ LOG.info("List of live processors as seen in the table = {}", tableProcessors);
+ if ((System.currentTimeMillis() - startTime) > (BARRIER_TIMEOUT_SEC * 1000)) {
+ barrierTimeout.getAndSet(true);
+ listener.onStateChange();
+ } else if (blobProcessorSet.equals(tableProcessors)) {
+ listener.onStateChange();
+ }
+ }
+ } catch (Exception e) {
+ errorHandler.accept("Exception in LeaderBarrierCompleteScheduler. Stopping the processor...");
+ }
+ }, BARRIER_REACHED_DELAY_SEC, BARRIER_REACHED_DELAY_SEC, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void setStateChangeListener(SchedulerStateChangeListener listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public void shutdown() {
+ LOG.info("Shutting down LeaderBarrierCompleteScheduler Scheduler.");
+ scheduler.shutdownNow();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderLivenessCheckScheduler.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderLivenessCheckScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderLivenessCheckScheduler.java
new file mode 100644
index 0000000..e0fa448
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LeaderLivenessCheckScheduler.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.scheduler;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.samza.util.BlobUtils;
+import org.apache.samza.coordinator.data.ProcessorEntity;
+import org.apache.samza.util.TableUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Scheduler class invoked by each processor to check if the leader is alive.
+ * Checks every 30 seconds.
+ * If a processor row hasn't been updated since 30 seconds, the system assumes that the processor has died.
+ * All time units are in SECONDS.
+ */
+public class LeaderLivenessCheckScheduler implements TaskScheduler {
+ private static final Logger LOG = LoggerFactory.getLogger(LeaderLivenessCheckScheduler.class);
+ private static final long LIVENESS_CHECK_DELAY_SEC = 10;
+ private static final long LIVENESS_DEBOUNCE_TIME_SEC = 30;
+ private static final ThreadFactory
+ PROCESSOR_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("LeaderLivenessCheckScheduler-%d").build();
+ private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(PROCESSOR_THREAD_FACTORY);
+ private final TableUtils table;
+ private final AtomicReference<String> currentJMVersion;
+ private final BlobUtils blob;
+ private final Consumer<String> errorHandler;
+ private final String initialState;
+ private SchedulerStateChangeListener listener = null;
+
+ public LeaderLivenessCheckScheduler(Consumer<String> errorHandler, TableUtils table, BlobUtils blob, AtomicReference<String> currentJMVersion, String initialState) {
+ this.table = table;
+ this.blob = blob;
+ this.currentJMVersion = currentJMVersion;
+ this.initialState = initialState;
+ this.errorHandler = errorHandler;
+ }
+
+ @Override
+ public ScheduledFuture scheduleTask() {
+ return scheduler.scheduleWithFixedDelay(() -> {
+ try {
+ LOG.info("Checking for leader liveness");
+ if (!checkIfLeaderAlive()) {
+ listener.onStateChange();
+ }
+ } catch (Exception e) {
+ errorHandler.accept("Exception in Leader Liveness Check Scheduler. Stopping the processor...");
+ }
+ }, LIVENESS_CHECK_DELAY_SEC, LIVENESS_CHECK_DELAY_SEC, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void setStateChangeListener(SchedulerStateChangeListener listener) {
+ this.listener = listener;
+ }
+
+ private boolean checkIfLeaderAlive() {
+ String currJMV = currentJMVersion.get();
+ String blobJMV = blob.getJobModelVersion();
+ //Get the leader processor row from the table.
+ Iterable<ProcessorEntity> tableList = table.getEntitiesWithPartition(currJMV);
+ ProcessorEntity leader = null, nextLeader = null;
+ for (ProcessorEntity entity: tableList) {
+ if (entity.getIsLeader()) {
+ leader = entity;
+ break;
+ }
+ }
+ int currJMVInt = 0;
+ if (!currJMV.equals(initialState)) {
+ currJMVInt = Integer.valueOf(currJMV);
+ }
+ if (Integer.valueOf(blobJMV) > currJMVInt) {
+ for (ProcessorEntity entity : table.getEntitiesWithPartition(blobJMV)) {
+ if (entity.getIsLeader()) {
+ nextLeader = entity;
+ break;
+ }
+ }
+ }
+ // Check if row hasn't been updated since 30 seconds.
+ if ((leader == null || (System.currentTimeMillis() - leader.getTimestamp().getTime() >= (
+ LIVENESS_DEBOUNCE_TIME_SEC * 1000))) && nextLeader == null) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void shutdown() {
+ LOG.info("Shutting down LeaderLivenessCheckScheduler Scheduler.");
+ scheduler.shutdownNow();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LivenessCheckScheduler.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LivenessCheckScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LivenessCheckScheduler.java
new file mode 100644
index 0000000..d4715f3
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/LivenessCheckScheduler.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.scheduler;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.samza.util.BlobUtils;
+import org.apache.samza.util.TableUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Scheduler class invoked by the leader to check for changes in the list of live processors.
+ * Checks every 30 seconds.
+ * If a processor row hasn't been updated since 30 seconds, the system assumes that the processor has died.
+ * All time units are in SECONDS.
+ */
+public class LivenessCheckScheduler implements TaskScheduler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LivenessCheckScheduler.class);
+ private static final long LIVENESS_CHECK_DELAY_SEC = 5;
+ private static final ThreadFactory
+ PROCESSOR_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("LivenessCheckScheduler-%d").build();
+ private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(PROCESSOR_THREAD_FACTORY);
+ private final TableUtils table;
+ private final BlobUtils blob;
+ private final AtomicReference<String> currentJMVersion;
+ private final AtomicReference<List<String>> liveProcessorsList = new AtomicReference<>(null);
+ private final Consumer<String> errorHandler;
+ private SchedulerStateChangeListener listener = null;
+ private final String processorId;
+
+ public LivenessCheckScheduler(Consumer<String> errorHandler, TableUtils table, BlobUtils blob, AtomicReference<String> currentJMVersion, final String pid) {
+ this.table = table;
+ this.blob = blob;
+ this.currentJMVersion = currentJMVersion;
+ this.errorHandler = errorHandler;
+ this.processorId = pid;
+ }
+
+ @Override
+ public ScheduledFuture scheduleTask() {
+ return scheduler.scheduleWithFixedDelay(() -> {
+ try {
+ if (!table.getEntity(currentJMVersion.get(), processorId).getIsLeader()) {
+ LOG.info("Not the leader anymore. Shutting down LivenessCheckScheduler.");
+ scheduler.shutdownNow();
+ return;
+ }
+ LOG.info("Checking for list of live processors");
+ //Get the list of live processors published on the blob.
+ Set<String> currProcessors = new HashSet<>(blob.getLiveProcessorList());
+ //Get the list of live processors from the table. This is the current system state.
+ Set<String> liveProcessors = table.getActiveProcessorsList(currentJMVersion);
+ //Invoke listener if the table list is not consistent with the blob list.
+ if (!liveProcessors.equals(currProcessors)) {
+ liveProcessorsList.getAndSet(new ArrayList<>(liveProcessors));
+ listener.onStateChange();
+ }
+ } catch (Exception e) {
+ errorHandler.accept("Exception in Liveness Check Scheduler. Stopping the processor...");
+ }
+ }, LIVENESS_CHECK_DELAY_SEC, LIVENESS_CHECK_DELAY_SEC, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void setStateChangeListener(SchedulerStateChangeListener listener) {
+ this.listener = listener;
+ }
+
+ public AtomicReference<List<String>> getLiveProcessors() {
+ return liveProcessorsList;
+ }
+
+ @Override
+ public void shutdown() {
+ LOG.info("Shutting down LivenessCheckScheduler Scheduler.");
+ scheduler.shutdownNow();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/RenewLeaseScheduler.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/RenewLeaseScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/RenewLeaseScheduler.java
new file mode 100644
index 0000000..f158122
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/RenewLeaseScheduler.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.scheduler;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.samza.util.LeaseBlobManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Scheduler class to keep renewing the lease once an entity has acquired it.
+ * Renews every 45 seconds.
+ * All time units are in SECONDS.
+ */
+public class RenewLeaseScheduler implements TaskScheduler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RenewLeaseScheduler.class);
+ private static final long RENEW_LEASE_DELAY_SEC = 45;
+ private static final ThreadFactory
+ PROCESSOR_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("RenewLeaseScheduler-%d").build();
+ private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(PROCESSOR_THREAD_FACTORY);
+ private final LeaseBlobManager leaseBlobManager;
+ private final AtomicReference<String> leaseId;
+ private final Consumer<String> errorHandler;
+
+ public RenewLeaseScheduler(Consumer<String> errorHandler, LeaseBlobManager leaseBlobManager, AtomicReference<String> leaseId) {
+ this.leaseBlobManager = leaseBlobManager;
+ this.leaseId = leaseId;
+ this.errorHandler = errorHandler;
+ }
+
+ @Override
+ public ScheduledFuture scheduleTask() {
+ return scheduler.scheduleWithFixedDelay(() -> {
+ try {
+ LOG.info("Renewing lease");
+ boolean status = leaseBlobManager.renewLease(leaseId.get());
+ if (!status) {
+ errorHandler.accept("Unable to renew lease. Continuing as non-leader.");
+ }
+ } catch (Exception e) {
+ errorHandler.accept("Exception in Renew Lease Scheduler. Continuing as non-leader.");
+ }
+ }, RENEW_LEASE_DELAY_SEC, RENEW_LEASE_DELAY_SEC, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void setStateChangeListener(SchedulerStateChangeListener listener) {}
+
+ @Override
+ public void shutdown() {
+ LOG.info("Shutting down RenewLeaseScheduler Scheduler.");
+ scheduler.shutdownNow();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/SchedulerStateChangeListener.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/SchedulerStateChangeListener.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/SchedulerStateChangeListener.java
new file mode 100644
index 0000000..95fc4e1
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/SchedulerStateChangeListener.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.scheduler;
+
+/**
+ * Listener interface for Azure Job Coordinator, to track state changes and take necessary actions.
+ */
+public interface SchedulerStateChangeListener {
+
+ void onStateChange();
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/TaskScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/TaskScheduler.java
new file mode 100644
index 0000000..63d6e24
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/TaskScheduler.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.scheduler;
+
+import java.util.concurrent.ScheduledFuture;
+
+
+/**
+ * Interface for scheduling tasks for Azure Job Coordinator.
+ */
+public interface TaskScheduler {
+
+ ScheduledFuture scheduleTask();
+
+ void setStateChangeListener(SchedulerStateChangeListener listener);
+
+ void shutdown();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/util/BlobUtils.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/util/BlobUtils.java b/samza-azure/src/main/java/org/apache/samza/util/BlobUtils.java
new file mode 100644
index 0000000..85e4273
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/util/BlobUtils.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util;
+
+import com.microsoft.azure.storage.AccessCondition;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudBlobClient;
+import com.microsoft.azure.storage.blob.CloudBlobContainer;
+import com.microsoft.azure.storage.blob.CloudPageBlob;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.samza.AzureClient;
+import org.apache.samza.AzureException;
+import org.apache.samza.coordinator.data.JobModelBundle;
+import org.apache.samza.SamzaException;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.serializers.model.SamzaObjectMapper;
+import org.eclipse.jetty.http.HttpStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Client side class that has reference to Azure blob storage.
+ * Used for writing and reading from the blob.
+ * Every write requires a valid lease ID.
+ */
+public class BlobUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BlobUtils.class);
+ private static final long JOB_MODEL_BLOCK_SIZE = 1024000;
+ private static final long BARRIER_STATE_BLOCK_SIZE = 1024;
+ private static final long PROCESSOR_LIST_BLOCK_SIZE = 1024;
+ private CloudBlobClient blobClient;
+ private CloudBlobContainer container;
+ private CloudPageBlob blob;
+
+ /**
+ * Creates an object of BlobUtils. It creates the container and page blob if they don't exist already.
+ * @param client Client handle for access to Azure Storage account.
+ * @param containerName Name of container inside which we want the blob to reside.
+ * @param blobName Name of the blob to be managed.
+ * @param length Length of the page blob.
+ * @throws AzureException If an Azure storage service error occurred, or when the container name or blob name is invalid.
+ */
+ public BlobUtils(AzureClient client, String containerName, String blobName, long length) {
+ this.blobClient = client.getBlobClient();
+ try {
+ this.container = blobClient.getContainerReference(containerName);
+ container.createIfNotExists();
+ this.blob = container.getPageBlobReference(blobName);
+ if (!blob.exists()) {
+ blob.create(length, AccessCondition.generateIfNotExistsCondition(), null, null);
+ }
+ } catch (URISyntaxException e) {
+ LOG.error("Container name: " + containerName + " or blob name: " + blobName + " invalid.", e);
+ throw new AzureException(e);
+ } catch (StorageException e) {
+ int httpStatusCode = e.getHttpStatusCode();
+ if (httpStatusCode == HttpStatus.CONFLICT_409) {
+ LOG.info("The blob you're trying to create exists already.", e);
+ } else {
+ LOG.error("Azure Storage Exception!", e);
+ throw new AzureException(e);
+ }
+ }
+ }
+
+ /**
+ * Writes the job model to the blob.
+ * Write is successful only if the lease ID passed is valid and the processor holds the lease.
+ * Called by the leader.
+ * @param prevJM Previous job model version that the processor was operating on.
+ * @param currJM Current job model version that the processor is operating on.
+ * @param prevJMV Previous job model version that the processor was operating on.
+ * @param currJMV Current job model version that the processor is operating on.
+ * @param leaseId LeaseID of the lease that the processor holds on the blob. Null if there is no lease.
+ * @return true if write to the blob is successful, false if leaseID is null or an Azure storage service error or IO exception occurred.
+ */
+ public boolean publishJobModel(JobModel prevJM, JobModel currJM, String prevJMV, String currJMV, String leaseId) {
+ try {
+ if (leaseId == null) {
+ return false;
+ }
+ JobModelBundle bundle = new JobModelBundle(prevJM, currJM, prevJMV, currJMV);
+ byte[] data = SamzaObjectMapper.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsBytes(bundle);
+ byte[] pageData = Arrays.copyOf(data, (int) JOB_MODEL_BLOCK_SIZE);
+ InputStream is = new ByteArrayInputStream(pageData);
+ blob.uploadPages(is, 0, JOB_MODEL_BLOCK_SIZE, AccessCondition.generateLeaseCondition(leaseId), null, null);
+ LOG.info("Uploaded {} jobModel to blob", bundle.getCurrJobModel());
+ return true;
+ } catch (StorageException | IOException e) {
+ LOG.error("JobModel publish failed for version = " + currJMV, e);
+ return false;
+ }
+ }
+
+ /**
+ * Reads the current job model from the blob.
+ * @return The current job model published on the blob. Returns null when job model details not found on the blob.
+ * @throws AzureException in getJobModelBundle() if an Azure storage service error occurred.
+ * @throws SamzaException in getJobModelBundle() if data retrieved from blob could not be parsed by SamzaObjectMapper.
+ */
+ public JobModel getJobModel() {
+ LOG.info("Reading the job model from blob.");
+ JobModelBundle jmBundle = getJobModelBundle();
+ if (jmBundle == null) {
+ LOG.error("Job Model details don't exist on the blob.");
+ return null;
+ }
+ JobModel jm = jmBundle.getCurrJobModel();
+ return jm;
+ }
+
+ /**
+ * Reads the current job model version from the blob .
+ * @return Current job model version published on the blob. Returns null when job model details not found on the blob.
+ * @throws AzureException in getJobModelBundle() if an Azure storage service error occurred.
+ * @throws SamzaException in getJobModelBundle() if data retrieved from blob could not be parsed by SamzaObjectMapper.
+ */
+ public String getJobModelVersion() {
+ LOG.info("Reading the job model version from blob.");
+ JobModelBundle jmBundle = getJobModelBundle();
+ if (jmBundle == null) {
+ LOG.error("Job Model details don't exist on the blob.");
+ return null;
+ }
+ String jmVersion = jmBundle.getCurrJobModelVersion();
+ return jmVersion;
+ }
+
+ /**
+ * Writes the barrier state to the blob.
+ * Write is successful only if the lease ID passed is valid and the processor holds the lease.
+ * Called only by the leader.
+ * @param state Barrier state to be published to the blob.
+ * @param leaseId LeaseID of the valid lease that the processor holds on the blob. Null if there is no lease.
+ * @return true if write to the blob is successful, false if leaseID is null or an Azure storage service error or IO exception occurred.
+ */
+ public boolean publishBarrierState(String state, String leaseId) {
+ try {
+ if (leaseId == null) {
+ return false;
+ }
+ byte[] data = SamzaObjectMapper.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsBytes(state);
+ byte[] pageData = Arrays.copyOf(data, (int) BARRIER_STATE_BLOCK_SIZE);
+ InputStream is = new ByteArrayInputStream(pageData);
+
+ //uploadPages is only successful when the AccessCondition provided has an active and valid lease ID. It fails otherwise.
+ blob.uploadPages(is, JOB_MODEL_BLOCK_SIZE, BARRIER_STATE_BLOCK_SIZE, AccessCondition.generateLeaseCondition(leaseId), null, null);
+ LOG.info("Uploaded barrier state {} to blob", state);
+ return true;
+ } catch (StorageException | IOException e) {
+ LOG.error("Barrier state " + state + " publish failed", e);
+ return false;
+ }
+ }
+
+ /**
+ * Reads the current barrier state from the blob.
+ * @return Barrier state published on the blob.
+ * @throws AzureException If an Azure storage service error occurred.
+ * @throws SamzaException If data retrieved from blob could not be parsed by SamzaObjectMapper.
+ */
+ public String getBarrierState() {
+ LOG.info("Reading the barrier state from blob.");
+ byte[] data = new byte[(int) BARRIER_STATE_BLOCK_SIZE];
+ try {
+ blob.downloadRangeToByteArray(JOB_MODEL_BLOCK_SIZE, BARRIER_STATE_BLOCK_SIZE, data, 0);
+ } catch (StorageException e) {
+ LOG.error("Failed to read barrier state from blob.", e);
+ throw new AzureException(e);
+ }
+ String state;
+ try {
+ state = SamzaObjectMapper.getObjectMapper().readValue(data, String.class);
+ } catch (IOException e) {
+ LOG.error("Failed to parse byte data: " + data + " for barrier state retrieved from the blob.", e);
+ throw new SamzaException(e);
+ }
+ return state;
+ }
+
+ /**
+ * Writes the list of live processors in the system to the blob.
+ * Write is successful only if the lease ID passed is valid and the processor holds the lease.
+ * Called only by the leader.
+ * @param processors List of live processors to be published on the blob.
+ * @param leaseId LeaseID of the valid lease that the processor holds on the blob. Null if there is no lease.
+ * @return true if write to the blob is successful, false if leaseID is null or an Azure storage service error or IO exception occurred.
+ */
+ public boolean publishLiveProcessorList(List<String> processors, String leaseId) {
+ try {
+ if (leaseId == null) {
+ return false;
+ }
+ byte[] data = SamzaObjectMapper.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsBytes(processors);
+ byte[] pageData = Arrays.copyOf(data, (int) BARRIER_STATE_BLOCK_SIZE);
+ InputStream is = new ByteArrayInputStream(pageData);
+ blob.uploadPages(is, JOB_MODEL_BLOCK_SIZE + BARRIER_STATE_BLOCK_SIZE, PROCESSOR_LIST_BLOCK_SIZE, AccessCondition.generateLeaseCondition(leaseId), null, null);
+ LOG.info("Uploaded list of live processors to blob.");
+ return true;
+ } catch (StorageException | IOException e) {
+ LOG.error("Processor list: " + processors + "publish failed", e);
+ return false;
+ }
+ }
+
+ /**
+ * Reads the list of live processors published on the blob.
+ * @return String list of live processors.
+ * @throws AzureException If an Azure storage service error occurred.
+ * @throws SamzaException If data retrieved from blob could not be parsed by SamzaObjectMapper.
+ */
+ public List<String> getLiveProcessorList() {
+ LOG.info("Read the the list of live processors from blob.");
+ byte[] data = new byte[(int) PROCESSOR_LIST_BLOCK_SIZE];
+ try {
+ blob.downloadRangeToByteArray(JOB_MODEL_BLOCK_SIZE + BARRIER_STATE_BLOCK_SIZE, PROCESSOR_LIST_BLOCK_SIZE, data, 0);
+ } catch (StorageException e) {
+ LOG.error("Failed to read the list of live processors from the blob.", new AzureException(e));
+ throw new AzureException(e);
+ }
+ List<String> list;
+ try {
+ list = SamzaObjectMapper.getObjectMapper().readValue(data, List.class);
+ } catch (IOException e) {
+ LOG.error("Failed to parse byte data: " + data + " for live processor list retrieved from the blob", new SamzaException(e));
+ throw new SamzaException(e);
+ }
+ return list;
+ }
+
+ public CloudBlobClient getBlobClient() {
+ return this.blobClient;
+ }
+
+ public CloudBlobContainer getBlobContainer() {
+ return this.container;
+ }
+
+ public CloudPageBlob getBlob() {
+ return this.blob;
+ }
+
+ private JobModelBundle getJobModelBundle() {
+ byte[] data = new byte[(int) JOB_MODEL_BLOCK_SIZE];
+ try {
+ blob.downloadRangeToByteArray(0, JOB_MODEL_BLOCK_SIZE, data, 0);
+ } catch (StorageException e) {
+ LOG.error("Failed to read JobModel details from the blob.", e);
+ throw new AzureException(e);
+ }
+ try {
+ JobModelBundle jmBundle = SamzaObjectMapper.getObjectMapper().readValue(data, JobModelBundle.class);
+ return jmBundle;
+ } catch (IOException e) {
+ LOG.error("Failed to parse byte data: " + data + " for JobModel details retrieved from the blob", e);
+ throw new SamzaException(e);
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/util/LeaseBlobManager.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/util/LeaseBlobManager.java b/samza-azure/src/main/java/org/apache/samza/util/LeaseBlobManager.java
new file mode 100644
index 0000000..dffb7ae
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/util/LeaseBlobManager.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util;
+
+import com.microsoft.azure.storage.AccessCondition;
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.blob.CloudPageBlob;
+import org.apache.samza.AzureException;
+import org.eclipse.jetty.http.HttpStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Helper class for lease blob operations.
+ */
+public class LeaseBlobManager {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LeaseBlobManager.class);
+ private final CloudPageBlob leaseBlob;
+
+ public LeaseBlobManager(CloudPageBlob leaseBlob) {
+ this.leaseBlob = leaseBlob;
+ }
+
+ /**
+ * Acquires a lease on a blob. The lease ID is NULL initially.
+ * @param leaseTimeInSec The time in seconds you want to acquire the lease for.
+ * @param leaseId Proposed ID you want to acquire the lease with, null if not proposed.
+ * @return String that represents lease ID. Null if acquireLease is unsuccessful because the blob is leased already.
+ * @throws AzureException If a Azure storage service error occurred. This includes the case where the blob you're trying to lease does not exist.
+ */
+ public String acquireLease(int leaseTimeInSec, String leaseId) {
+ try {
+ String id = leaseBlob.acquireLease(leaseTimeInSec, leaseId);
+ LOG.info("Acquired lease with lease id = " + id);
+ return id;
+ } catch (StorageException storageException) {
+ int httpStatusCode = storageException.getHttpStatusCode();
+ if (httpStatusCode == HttpStatus.CONFLICT_409) {
+ LOG.info("The blob you're trying to acquire is leased already.", storageException.getMessage());
+ } else if (httpStatusCode == HttpStatus.NOT_FOUND_404) {
+ LOG.error("The blob you're trying to lease does not exist.", storageException);
+ throw new AzureException(storageException);
+ } else {
+ LOG.error("Error acquiring lease!", storageException);
+ throw new AzureException(storageException);
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Renews the lease on the blob.
+ * @param leaseId ID of the lease to be renewed.
+ * @return True if lease was renewed successfully, false otherwise.
+ */
+ public boolean renewLease(String leaseId) {
+ try {
+ leaseBlob.renewLease(AccessCondition.generateLeaseCondition(leaseId));
+ return true;
+ } catch (StorageException storageException) {
+ LOG.error("Wasn't able to renew lease with lease id: " + leaseId, storageException);
+ return false;
+ }
+ }
+
+ /**
+ * Releases the lease on the blob.
+ * @param leaseId ID of the lease to be released.
+ * @return True if released successfully, false otherwise.
+ */
+ public boolean releaseLease(String leaseId) {
+ try {
+ leaseBlob.releaseLease(AccessCondition.generateLeaseCondition(leaseId));
+ return true;
+ } catch (StorageException storageException) {
+ LOG.error("Wasn't able to release lease with lease id: " + leaseId, storageException);
+ return false;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/util/TableUtils.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/util/TableUtils.java b/samza-azure/src/main/java/org/apache/samza/util/TableUtils.java
new file mode 100644
index 0000000..f49ce27
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/util/TableUtils.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.util;
+
+import com.microsoft.azure.storage.StorageException;
+import com.microsoft.azure.storage.table.CloudTable;
+import com.microsoft.azure.storage.table.CloudTableClient;
+import com.microsoft.azure.storage.table.TableOperation;
+import com.microsoft.azure.storage.table.TableQuery;
+import java.net.URISyntaxException;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.samza.AzureClient;
+import org.apache.samza.AzureException;
+import org.apache.samza.coordinator.data.ProcessorEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Client side class that has a reference to Azure Table Storage.
+ * Enables the user to add or delete information from the table, make updates to the table and retrieve information from the table.
+ * Every row in a table is uniquely identified by a combination of the PARTIITON KEY and ROW KEY.
+ * PARTITION KEY = Group ID = Job Model Version (for this case).
+ * ROW KEY = Unique entity ID for a group = Processor ID (for this case).
+ */
+public class TableUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TableUtils.class);
+ private static final String PARTITION_KEY = "PartitionKey";
+ private static final long LIVENESS_DEBOUNCE_TIME_SEC = 30;
+ private final String initialState;
+ private final CloudTableClient tableClient;
+ private final CloudTable table;
+
+ public TableUtils(AzureClient client, String tableName, String initialState) {
+ this.initialState = initialState;
+ tableClient = client.getTableClient();
+ try {
+ table = tableClient.getTableReference(tableName);
+ table.createIfNotExists();
+ } catch (URISyntaxException e) {
+ LOG.error("\nConnection string specifies an invalid URI.", e);
+ throw new AzureException(e);
+ } catch (StorageException e) {
+ LOG.error("Azure storage exception.", e);
+ throw new AzureException(e);
+ }
+ }
+
+ /**
+ * Add a row which denotes an active processor to the processor table.
+ * @param jmVersion Job model version that the processor is operating on.
+ * @param pid Unique processor ID.
+ * @param isLeader Denotes whether the processor is a leader or not.
+ * @throws AzureException If an Azure storage service error occurred.
+ */
+ public void addProcessorEntity(String jmVersion, String pid, boolean isLeader) {
+ ProcessorEntity entity = new ProcessorEntity(jmVersion, pid);
+ entity.setIsLeader(isLeader);
+ entity.updateLiveness();
+ TableOperation add = TableOperation.insert(entity);
+ try {
+ table.execute(add);
+ } catch (StorageException e) {
+ LOG.error("Azure storage exception while adding processor entity with job model version: " + jmVersion + "and pid: " + pid, e);
+ throw new AzureException(e);
+ }
+ }
+
+ /**
+ * Retrieve a particular row in the processor table, given the partition key and the row key.
+ * @param jmVersion Job model version of the processor row to be retrieved.
+ * @param pid Unique processor ID of the processor row to be retrieved.
+ * @return An instance of required processor entity. Null if does not exist.
+ * @throws AzureException If an Azure storage service error occurred.
+ */
+ public ProcessorEntity getEntity(String jmVersion, String pid) {
+ try {
+ TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class);
+ ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType();
+ return entity;
+ } catch (StorageException e) {
+ LOG.error("Azure storage exception while retrieving processor entity with job model version: " + jmVersion + "and pid: " + pid, e);
+ throw new AzureException(e);
+ }
+ }
+
+ /**
+ * Updates the liveness value of a particular processor with a randomly generated integer, which in turn updates the last modified since timestamp of the row.
+ * @param jmVersion Job model version of the processor row to be updated.
+ * @param pid Unique processor ID of the processor row to be updated.
+ */
+ public void updateHeartbeat(String jmVersion, String pid) {
+ try {
+ Random rand = new Random();
+ TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class);
+ ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType();
+ entity.updateLiveness();
+ TableOperation update = TableOperation.replace(entity);
+ table.execute(update);
+ } catch (StorageException e) {
+ LOG.error("Azure storage exception while updating heartbeat for job model version: " + jmVersion + "and pid: " + pid, e);
+ }
+ }
+
+ /**
+ * Updates the isLeader value when the processor starts or stops being a leader.
+ * @param jmVersion Job model version of the processor row to be updated.
+ * @param pid Unique processor ID of the processor row to be updated.
+ * @param isLeader Denotes whether the processor is a leader or not.
+ * @throws AzureException If an Azure storage service error occurred.
+ */
+ public void updateIsLeader(String jmVersion, String pid, boolean isLeader) {
+ try {
+ TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class);
+ ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType();
+ entity.setIsLeader(isLeader);
+ TableOperation update = TableOperation.replace(entity);
+ table.execute(update);
+ } catch (StorageException e) {
+ LOG.error("Azure storage exception while updating isLeader value for job model version: " + jmVersion + "and pid: " + pid, e);
+ throw new AzureException(e);
+ }
+ }
+
+ /**
+ * Deletes a specified row in the processor table.
+ * @param jmVersion Job model version of the processor row to be deleted.
+ * @param pid Unique processor ID of the processor row to be deleted.
+ * @throws AzureException If an Azure storage service error occurred.
+ */
+ public void deleteProcessorEntity(String jmVersion, String pid) {
+ try {
+ TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class);
+ ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType();
+ TableOperation remove = TableOperation.delete(entity);
+ table.execute(remove);
+ } catch (StorageException e) {
+ LOG.error("Azure storage exception while deleting processor entity with job model version: " + jmVersion + "and pid: " + pid, e);
+ throw new AzureException(e);
+ }
+ }
+
+ /**
+ * Retrieve all rows in a table with the given partition key.
+ * @param partitionKey Job model version of the processors to be retrieved.
+ * @return Iterable list of processor entities.
+ */
+ public Iterable<ProcessorEntity> getEntitiesWithPartition(String partitionKey) {
+ String partitionFilter = TableQuery.generateFilterCondition(PARTITION_KEY, TableQuery.QueryComparisons.EQUAL, partitionKey);
+ TableQuery<ProcessorEntity> partitionQuery = TableQuery.from(ProcessorEntity.class).where(partitionFilter);
+ return table.execute(partitionQuery);
+ }
+
+ /**
+ * Gets the list of all active processors that are heartbeating to the processor table.
+ * @param currentJMVersion Current job model version that the processors in the application are operating on.
+ * @return List of ids of currently active processors in the application, retrieved from the processor table.
+ */
+ public Set<String> getActiveProcessorsList(AtomicReference<String> currentJMVersion) {
+ Iterable<ProcessorEntity> tableList = getEntitiesWithPartition(currentJMVersion.get());
+ Set<String> activeProcessorsList = new HashSet<>();
+ for (ProcessorEntity entity: tableList) {
+ if (System.currentTimeMillis() - entity.getTimestamp().getTime() <= (LIVENESS_DEBOUNCE_TIME_SEC * 1000)) {
+ activeProcessorsList.add(entity.getRowKey());
+ }
+ }
+
+ Iterable<ProcessorEntity> unassignedList = getEntitiesWithPartition(initialState);
+ for (ProcessorEntity entity: unassignedList) {
+ long temp = System.currentTimeMillis() - entity.getTimestamp().getTime();
+ LOG.info("Time elapsed since last heartbeat: {}", temp);
+ if (temp <= (LIVENESS_DEBOUNCE_TIME_SEC * 1000)) {
+ activeProcessorsList.add(entity.getRowKey());
+ }
+ }
+ LOG.info("Active processors list: {}", activeProcessorsList);
+ return activeProcessorsList;
+ }
+
+ public CloudTable getTable() {
+ return table;
+ }
+
+}
\ No newline at end of file
[2/2] samza git commit: SAMZA-1378: Introduce and Implement Scheduler
Interface for Polling in Azure
Posted by na...@apache.org.
SAMZA-1378: Introduce and Implement Scheduler Interface for Polling in Azure
PR 1: AzureClient + AzureConfig
PR 2: LeaseBlobManager
PR 3: BlobUtils + JobModelBundle
PR 4: TableUtils + ProcessorEntity
PR 5: AzureLeaderElector
PR 6: Added all schedulers (current PR)
Author: PawasChhokra <Jaimatadi1$>
Author: PawasChhokra <pa...@gmail.com>
Reviewers: Navina Ramesh <na...@apache.org>
Closes #261 from PawasChhokra/AzureSchedulers
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c3b447ec
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c3b447ec
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c3b447ec
Branch: refs/heads/master
Commit: c3b447ecb343ddc4e48296448127fda5dfafe913
Parents: 1d253c7
Author: Pawas Chhokra <pa...@gmail.com>
Authored: Fri Aug 18 14:38:46 2017 -0700
Committer: navina <na...@apache.org>
Committed: Fri Aug 18 14:38:46 2017 -0700
----------------------------------------------------------------------
.../main/java/org/apache/samza/AzureClient.java | 18 +-
.../main/java/org/apache/samza/AzureConfig.java | 73 ---
.../org/apache/samza/AzureLeaderElector.java | 111 ----
.../main/java/org/apache/samza/BlobUtils.java | 280 ----------
.../java/org/apache/samza/JobModelBundle.java | 61 ---
.../java/org/apache/samza/LeaseBlobManager.java | 98 ----
.../java/org/apache/samza/ProcessorEntity.java | 58 ---
.../main/java/org/apache/samza/TableUtils.java | 198 --------
.../org/apache/samza/config/AzureConfig.java | 68 +++
.../samza/coordinator/AzureJobCoordinator.java | 509 +++++++++++++++++++
.../samza/coordinator/AzureLeaderElector.java | 109 ++++
.../samza/coordinator/data/BarrierState.java | 27 +
.../samza/coordinator/data/JobModelBundle.java | 61 +++
.../samza/coordinator/data/ProcessorEntity.java | 62 +++
.../scheduler/HeartbeatScheduler.java | 81 +++
.../scheduler/JMVersionUpgradeScheduler.java | 99 ++++
.../LeaderBarrierCompleteScheduler.java | 118 +++++
.../scheduler/LeaderLivenessCheckScheduler.java | 120 +++++
.../scheduler/LivenessCheckScheduler.java | 108 ++++
.../scheduler/RenewLeaseScheduler.java | 79 +++
.../scheduler/SchedulerStateChangeListener.java | 29 ++
.../coordinator/scheduler/TaskScheduler.java | 35 ++
.../java/org/apache/samza/util/BlobUtils.java | 284 +++++++++++
.../org/apache/samza/util/LeaseBlobManager.java | 99 ++++
.../java/org/apache/samza/util/TableUtils.java | 205 ++++++++
25 files changed, 2105 insertions(+), 885 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/AzureClient.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/AzureClient.java b/samza-azure/src/main/java/org/apache/samza/AzureClient.java
index 2248d12..04f8fd3 100644
--- a/samza-azure/src/main/java/org/apache/samza/AzureClient.java
+++ b/samza-azure/src/main/java/org/apache/samza/AzureClient.java
@@ -25,6 +25,7 @@ import com.microsoft.azure.storage.RetryPolicy;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.table.CloudTableClient;
+import com.microsoft.azure.storage.table.TableRequestOptions;
import java.net.URISyntaxException;
import java.security.InvalidKeyException;
import org.slf4j.Logger;
@@ -44,21 +45,26 @@ public class AzureClient {
/**
* Creates a reference to the Azure Storage account according to the connection string that the client passes.
* Also creates references to Azure Blob Storage and Azure Table Storage.
- * @param storageConnectionString Connection string to conenct to Azure Storage Account, format: "DefaultEndpointsProtocol=<https>;AccountName=<>;AccountKey=<>"
+ * @param storageConnectionString Connection string to connect to Azure Storage Account
+ * Format: DefaultEndpointsProtocol=https;AccountName="Insert your account name";AccountKey="Insert your account key"
* @throws AzureException If an Azure storage service error occurred, or when the storageConnectionString is invalid.
*/
- AzureClient(String storageConnectionString) {
+ public AzureClient(String storageConnectionString) {
try {
account = CloudStorageAccount.parse(storageConnectionString);
+ RetryPolicy retryPolicy = new RetryLinearRetry(5000, 3);
blobClient = account.createCloudBlobClient();
// Set retry policy for operations on the blob. In this case, every failed operation on the blob will be retried thrice, after 5 second intervals.
- BlobRequestOptions options = new BlobRequestOptions();
- RetryPolicy retryPolicy = new RetryLinearRetry(5000, 3);
- options.setRetryPolicyFactory(retryPolicy);
- blobClient.setDefaultRequestOptions(options);
+ BlobRequestOptions blobOptions = new BlobRequestOptions();
+ blobOptions.setRetryPolicyFactory(retryPolicy);
+ blobClient.setDefaultRequestOptions(blobOptions);
+ // Set retry policy for operations on the table. In this case, every failed operation on the table will be retried thrice, after 5 second intervals.
tableClient = account.createCloudTableClient();
+ TableRequestOptions tableOptions = new TableRequestOptions();
+ tableOptions.setRetryPolicyFactory(retryPolicy);
+ tableClient.setDefaultRequestOptions(tableOptions);
} catch (IllegalArgumentException | URISyntaxException e) {
LOG.error("Connection string {} specifies an invalid URI.", storageConnectionString);
LOG.error("Please confirm the connection string is in the Azure connection string format.");
http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/AzureConfig.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/AzureConfig.java b/samza-azure/src/main/java/org/apache/samza/AzureConfig.java
deleted file mode 100644
index 47873a7..0000000
--- a/samza-azure/src/main/java/org/apache/samza/AzureConfig.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza;
-
-import org.apache.samza.config.ApplicationConfig;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.ConfigException;
-import org.apache.samza.config.MapConfig;
-
-/**
- * Config class for reading all user defined parameters for Azure driven coordination services.
- */
-public class AzureConfig extends MapConfig {
-
- // Connection string for Azure Storage Account, format: "DefaultEndpointsProtocol=<https>;AccountName=<>;AccountKey=<>"
- public static final String AZURE_STORAGE_CONNECT = "azure.storage.connect";
- public static final String AZURE_PAGEBLOB_LENGTH = "job.coordinator.azure.blob.length";
- public static final long DEFAULT_AZURE_PAGEBLOB_LENGTH = 5120000;
-
- private static String containerName;
- private static String blobName;
- private static String tableName;
-
- public AzureConfig(Config config) {
- super(config);
- ApplicationConfig appConfig = new ApplicationConfig(config);
- //Remove all non-alphanumeric characters from id as table name does not allow them.
- String id = appConfig.getGlobalAppId().replaceAll("[^A-Za-z0-9]", "");
- containerName = "samzacontainer" + id;
- blobName = "samzablob" + id;
- tableName = "samzatable" + id;
- }
-
- public String getAzureConnect() {
- if (!containsKey(AZURE_STORAGE_CONNECT)) {
- throw new ConfigException("Missing " + AZURE_STORAGE_CONNECT + " config!");
- }
- return get(AZURE_STORAGE_CONNECT);
- }
-
- public String getAzureContainerName() {
- return containerName;
- }
-
- public String getAzureBlobName() {
- return blobName;
- }
-
- public long getAzureBlobLength() {
- return getLong(AZURE_PAGEBLOB_LENGTH, DEFAULT_AZURE_PAGEBLOB_LENGTH);
- }
-
- public String getAzureTableName() {
- return tableName;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/AzureLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/AzureLeaderElector.java b/samza-azure/src/main/java/org/apache/samza/AzureLeaderElector.java
deleted file mode 100644
index efa8ea1..0000000
--- a/samza-azure/src/main/java/org/apache/samza/AzureLeaderElector.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.samza.coordinator.LeaderElector;
-import org.apache.samza.coordinator.LeaderElectorListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Class to facilitate leader election in Azure.
- * The processor that acquires the lease on the blob becomes the leader.
- * The lease ID is null initially. It is generated by Azure when the processor acquires the lease, and updated accordingly.
- * Every processor requires a valid active lease ID in order to perform successful write and delete operations on the blob.
- * Read operations from the blob are not dependent on the lease ID.
- */
-public class AzureLeaderElector implements LeaderElector {
-
- private static final Logger LOG = LoggerFactory.getLogger(AzureLeaderElector.class);
- private static final int LEASE_TIME_IN_SEC = 60;
- private final LeaseBlobManager leaseBlobManager;
- private LeaderElectorListener leaderElectorListener = null;
- private final AtomicReference<String> leaseId;
- private final AtomicBoolean isLeader;
-
- public AzureLeaderElector(LeaseBlobManager leaseBlobManager) {
- this.isLeader = new AtomicBoolean(false);
- this.leaseBlobManager = leaseBlobManager;
- this.leaseId = new AtomicReference<>(null);
- }
-
- @Override
- public void setLeaderElectorListener(LeaderElectorListener listener) {
- this.leaderElectorListener = listener;
- }
-
- /**
- * Tries to become the leader by acquiring a lease on the blob.
- * The acquireLease operation has a retry policy where upon failure, the operation is tried 3 times at 5 second intervals.
- * Invokes the listener on becoming the leader.
- */
- @Override
- public void tryBecomeLeader() {
- try {
- leaseId.getAndSet(leaseBlobManager.acquireLease(LEASE_TIME_IN_SEC, leaseId.get()));
- if (leaseId.get() != null) {
- LOG.info("Became leader with lease ID {}.", leaseId.get());
- isLeader.set(true);
- if (leaderElectorListener != null) {
- leaderElectorListener.onBecomingLeader();
- }
- }
- } catch (AzureException e) {
- LOG.error("Error while trying to acquire lease.", e);
- }
- }
-
- /**
- * Releases the lease in order to resign leadership. It also stops all schedulers scheduled by the leader.
- * The releaseLease operation has a retry policy where upon failure, the operation is tried 3 times at 5 second intervals.
- */
- @Override
- public void resignLeadership() {
- if (isLeader.get()) {
- leaseBlobManager.releaseLease(leaseId.get());
- isLeader.set(false);
- LOG.info("Resigning leadership with lease ID {}", leaseId.get());
- leaseId.getAndSet(null);
- } else {
- LOG.info("Can't release the lease because it is not the leader and does not hold an active lease.");
- }
- }
-
- /**
- * Checks whether it's a leader
- * @return true if it is the leader, false otherwise
- */
- @Override
- public boolean amILeader() {
- return isLeader.get();
- }
-
- public AtomicReference<String> getLeaseId() {
- return leaseId;
- }
-
- public LeaseBlobManager getLeaseBlobManager() {
- return this.leaseBlobManager;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/BlobUtils.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/BlobUtils.java b/samza-azure/src/main/java/org/apache/samza/BlobUtils.java
deleted file mode 100644
index a798384..0000000
--- a/samza-azure/src/main/java/org/apache/samza/BlobUtils.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza;
-
-import com.microsoft.azure.storage.AccessCondition;
-import com.microsoft.azure.storage.StorageException;
-import com.microsoft.azure.storage.blob.CloudBlobClient;
-import com.microsoft.azure.storage.blob.CloudBlobContainer;
-import com.microsoft.azure.storage.blob.CloudPageBlob;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URISyntaxException;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.samza.job.model.JobModel;
-import org.apache.samza.serializers.model.SamzaObjectMapper;
-import org.eclipse.jetty.http.HttpStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Client side class that has reference to Azure blob storage.
- * Used for writing and reading from the blob.
- * Every write requires a valid lease ID.
- */
-public class BlobUtils {
-
- private static final Logger LOG = LoggerFactory.getLogger(BlobUtils.class);
- private static final long JOB_MODEL_BLOCK_SIZE = 1024000;
- private static final long BARRIER_STATE_BLOCK_SIZE = 1024;
- private static final long PROCESSOR_LIST_BLOCK_SIZE = 1024;
- private CloudBlobClient blobClient;
- private CloudBlobContainer container;
- private CloudPageBlob blob;
-
- /**
- * Creates an object of BlobUtils. It creates the container and page blob if they don't exist already.
- * @param client Client handle for access to Azure Storage account.
- * @param containerName Name of container inside which we want the blob to reside.
- * @param blobName Name of the blob to be managed.
- * @param length Length of the page blob.
- * @throws AzureException If an Azure storage service error occurred, or when the container name or blob name is invalid.
- */
- public BlobUtils(AzureClient client, String containerName, String blobName, long length) {
- this.blobClient = client.getBlobClient();
- try {
- this.container = blobClient.getContainerReference(containerName);
- container.createIfNotExists();
- this.blob = container.getPageBlobReference(blobName);
- if (!blob.exists()) {
- blob.create(length, AccessCondition.generateIfNotExistsCondition(), null, null);
- }
- } catch (URISyntaxException e) {
- LOG.error("Container name: " + containerName + " or blob name: " + blobName + " invalid.", e);
- throw new AzureException(e);
- } catch (StorageException e) {
- int httpStatusCode = e.getHttpStatusCode();
- if (httpStatusCode == HttpStatus.CONFLICT_409) {
- LOG.info("The blob you're trying to create exists already.", e);
- } else {
- LOG.error("Azure Storage Exception!", e);
- throw new AzureException(e);
- }
- }
- }
-
- /**
- * Writes the job model to the blob.
- * Write is successful only if the lease ID passed is valid and the processor holds the lease.
- * Called by the leader.
- * @param prevJM Previous job model version that the processor was operating on.
- * @param currJM Current job model version that the processor is operating on.
- * @param prevJMV Previous job model version that the processor was operating on.
- * @param currJMV Current job model version that the processor is operating on.
- * @param leaseId LeaseID of the lease that the processor holds on the blob. Null if there is no lease.
- * @return true if write to the blob is successful, false if leaseID is null or an Azure storage service error or IO exception occurred.
- */
- public boolean publishJobModel(JobModel prevJM, JobModel currJM, String prevJMV, String currJMV, String leaseId) {
- try {
- if (leaseId == null) {
- return false;
- }
- JobModelBundle bundle = new JobModelBundle(prevJM, currJM, prevJMV, currJMV);
- byte[] data = SamzaObjectMapper.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsBytes(bundle);
- byte[] pageData = Arrays.copyOf(data, (int) JOB_MODEL_BLOCK_SIZE);
- InputStream is = new ByteArrayInputStream(pageData);
- blob.uploadPages(is, 0, JOB_MODEL_BLOCK_SIZE, AccessCondition.generateLeaseCondition(leaseId), null, null);
- LOG.info("Uploaded {} jobModel to blob", bundle.getCurrJobModel());
- return true;
- } catch (StorageException | IOException e) {
- LOG.error("JobModel publish failed for version = " + currJMV, e);
- return false;
- }
- }
-
- /**
- * Reads the current job model from the blob.
- * @return The current job model published on the blob. Returns null when job model details not found on the blob.
- * @throws AzureException in getJobModelBundle() if an Azure storage service error occurred.
- * @throws SamzaException in getJobModelBundle() if data retrieved from blob could not be parsed by SamzaObjectMapper.
- */
- public JobModel getJobModel() {
- LOG.info("Reading the job model from blob.");
- JobModelBundle jmBundle = getJobModelBundle();
- if (jmBundle == null) {
- LOG.error("Job Model details don't exist on the blob.");
- return null;
- }
- JobModel jm = jmBundle.getCurrJobModel();
- return jm;
- }
-
- /**
- * Reads the current job model version from the blob .
- * @return Current job model version published on the blob. Returns null when job model details not found on the blob.
- * @throws AzureException in getJobModelBundle() if an Azure storage service error occurred.
- * @throws SamzaException in getJobModelBundle() if data retrieved from blob could not be parsed by SamzaObjectMapper.
- */
- public String getJobModelVersion() {
- LOG.info("Reading the job model version from blob.");
- JobModelBundle jmBundle = getJobModelBundle();
- if (jmBundle == null) {
- LOG.error("Job Model details don't exist on the blob.");
- return null;
- }
- String jmVersion = jmBundle.getCurrJobModelVersion();
- return jmVersion;
- }
-
- /**
- * Writes the barrier state to the blob.
- * Write is successful only if the lease ID passed is valid and the processor holds the lease.
- * Called only by the leader.
- * @param state Barrier state to be published to the blob.
- * @param leaseId LeaseID of the valid lease that the processor holds on the blob. Null if there is no lease.
- * @return true if write to the blob is successful, false if leaseID is null or an Azure storage service error or IO exception occurred.
- */
- public boolean publishBarrierState(String state, String leaseId) {
- try {
- if (leaseId == null) {
- return false;
- }
- byte[] data = SamzaObjectMapper.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsBytes(state);
- byte[] pageData = Arrays.copyOf(data, (int) BARRIER_STATE_BLOCK_SIZE);
- InputStream is = new ByteArrayInputStream(pageData);
-
- //uploadPages is only successful when the AccessCondition provided has an active and valid lease ID. It fails otherwise.
- blob.uploadPages(is, JOB_MODEL_BLOCK_SIZE, BARRIER_STATE_BLOCK_SIZE, AccessCondition.generateLeaseCondition(leaseId), null, null);
- LOG.info("Uploaded barrier state {} to blob", state);
- return true;
- } catch (StorageException | IOException e) {
- LOG.error("Barrier state " + state + " publish failed", e);
- return false;
- }
- }
-
- /**
- * Reads the current barrier state from the blob.
- * @return Barrier state published on the blob.
- * @throws AzureException If an Azure storage service error occurred.
- * @throws SamzaException If data retrieved from blob could not be parsed by SamzaObjectMapper.
- */
- public String getBarrierState() {
- LOG.info("Reading the barrier state from blob.");
- byte[] data = new byte[(int) BARRIER_STATE_BLOCK_SIZE];
- try {
- blob.downloadRangeToByteArray(JOB_MODEL_BLOCK_SIZE, BARRIER_STATE_BLOCK_SIZE, data, 0);
- } catch (StorageException e) {
- LOG.error("Failed to read barrier state from blob.", e);
- throw new AzureException(e);
- }
- String state;
- try {
- state = SamzaObjectMapper.getObjectMapper().readValue(data, String.class);
- } catch (IOException e) {
- LOG.error("Failed to parse byte data: " + data + " for barrier state retrieved from the blob.", e);
- throw new SamzaException(e);
- }
- return state;
- }
-
- /**
- * Writes the list of live processors in the system to the blob.
- * Write is successful only if the lease ID passed is valid and the processor holds the lease.
- * Called only by the leader.
- * @param processors List of live processors to be published on the blob.
- * @param leaseId LeaseID of the valid lease that the processor holds on the blob. Null if there is no lease.
- * @return true if write to the blob is successful, false if leaseID is null or an Azure storage service error or IO exception occurred.
- */
- public boolean publishLiveProcessorList(List<String> processors, String leaseId) {
- try {
- if (leaseId == null) {
- return false;
- }
- byte[] data = SamzaObjectMapper.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsBytes(processors);
- byte[] pageData = Arrays.copyOf(data, (int) BARRIER_STATE_BLOCK_SIZE);
- InputStream is = new ByteArrayInputStream(pageData);
- blob.uploadPages(is, JOB_MODEL_BLOCK_SIZE + BARRIER_STATE_BLOCK_SIZE, PROCESSOR_LIST_BLOCK_SIZE, AccessCondition.generateLeaseCondition(leaseId), null, null);
- LOG.info("Uploaded list of live processors to blob.");
- return true;
- } catch (StorageException | IOException e) {
- LOG.error("Processor list: " + processors + "publish failed", e);
- return false;
- }
- }
-
- /**
- * Reads the list of live processors published on the blob.
- * @return String list of live processors.
- * @throws AzureException If an Azure storage service error occurred.
- * @throws SamzaException If data retrieved from blob could not be parsed by SamzaObjectMapper.
- */
- public List<String> getLiveProcessorList() {
- LOG.info("Read the the list of live processors from blob.");
- byte[] data = new byte[(int) PROCESSOR_LIST_BLOCK_SIZE];
- try {
- blob.downloadRangeToByteArray(JOB_MODEL_BLOCK_SIZE + BARRIER_STATE_BLOCK_SIZE, PROCESSOR_LIST_BLOCK_SIZE, data, 0);
- } catch (StorageException e) {
- LOG.error("Failed to read the list of live processors from the blob.", new AzureException(e));
- throw new AzureException(e);
- }
- List<String> list;
- try {
- list = SamzaObjectMapper.getObjectMapper().readValue(data, List.class);
- } catch (IOException e) {
- LOG.error("Failed to parse byte data: " + data + " for live processor list retrieved from the blob", new SamzaException(e));
- throw new SamzaException(e);
- }
- return list;
- }
-
- public CloudBlobClient getBlobClient() {
- return this.blobClient;
- }
-
- public CloudBlobContainer getBlobContainer() {
- return this.container;
- }
-
- public CloudPageBlob getBlob() {
- return this.blob;
- }
-
- private JobModelBundle getJobModelBundle() {
- byte[] data = new byte[(int) JOB_MODEL_BLOCK_SIZE];
- try {
- blob.downloadRangeToByteArray(0, JOB_MODEL_BLOCK_SIZE, data, 0);
- } catch (StorageException e) {
- LOG.error("Failed to read JobModel details from the blob.", e);
- throw new AzureException(e);
- }
- try {
- JobModelBundle jmBundle = SamzaObjectMapper.getObjectMapper().readValue(data, JobModelBundle.class);
- return jmBundle;
- } catch (IOException e) {
- LOG.error("Failed to parse byte data: " + data + " for JobModel details retrieved from the blob", e);
- throw new SamzaException(e);
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/JobModelBundle.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/JobModelBundle.java b/samza-azure/src/main/java/org/apache/samza/JobModelBundle.java
deleted file mode 100644
index 3ff971f..0000000
--- a/samza-azure/src/main/java/org/apache/samza/JobModelBundle.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza;
-
-import org.apache.samza.job.model.JobModel;
-
-
-/**
- * Bundle class for current and previous - job model and job model version.
- * Used for publishing updated data to the blob in one go.
- */
-public class JobModelBundle {
-
- private JobModel prevJobModel;
- private JobModel currJobModel;
- private String prevJobModelVersion;
- private String currJobModelVersion;
-
- public JobModelBundle() {}
-
- public JobModelBundle(JobModel prevJM, JobModel currJM, String prevJMV, String currJMV) {
- prevJobModel = prevJM;
- currJobModel = currJM;
- prevJobModelVersion = prevJMV;
- currJobModelVersion = currJMV;
- }
-
- public JobModel getCurrJobModel() {
- return currJobModel;
- }
-
- public JobModel getPrevJobModel() {
- return prevJobModel;
- }
-
- public String getCurrJobModelVersion() {
- return currJobModelVersion;
- }
-
- public String getPrevJobModelVersion() {
- return prevJobModelVersion;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/LeaseBlobManager.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/LeaseBlobManager.java b/samza-azure/src/main/java/org/apache/samza/LeaseBlobManager.java
deleted file mode 100644
index 5375662..0000000
--- a/samza-azure/src/main/java/org/apache/samza/LeaseBlobManager.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza;
-
-import com.microsoft.azure.storage.AccessCondition;
-import com.microsoft.azure.storage.StorageException;
-import com.microsoft.azure.storage.blob.CloudPageBlob;
-import org.eclipse.jetty.http.HttpStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Helper class for lease blob operations.
- */
-public class LeaseBlobManager {
-
- private static final Logger LOG = LoggerFactory.getLogger(LeaseBlobManager.class);
- private CloudPageBlob leaseBlob;
-
- public LeaseBlobManager(CloudPageBlob leaseBlob) {
- this.leaseBlob = leaseBlob;
- }
-
- /**
- * Acquires a lease on a blob. The lease ID is NULL initially.
- * @param leaseTimeInSec The time in seconds you want to acquire the lease for.
- * @param leaseId Proposed ID you want to acquire the lease with, null if not proposed.
- * @return String that represents lease ID. Null if acquireLease is unsuccessful because the blob is leased already.
- * @throws AzureException If a Azure storage service error occurred. This includes the case where the blob you're trying to lease does not exist.
- */
- public String acquireLease(int leaseTimeInSec, String leaseId) {
- try {
- String id = leaseBlob.acquireLease(leaseTimeInSec, leaseId);
- LOG.info("Acquired lease with lease id = " + id);
- return id;
- } catch (StorageException storageException) {
- int httpStatusCode = storageException.getHttpStatusCode();
- if (httpStatusCode == HttpStatus.CONFLICT_409) {
- LOG.info("The blob you're trying to acquire is leased already.", storageException);
- } else if (httpStatusCode == HttpStatus.NOT_FOUND_404) {
- LOG.error("The blob you're trying to lease does not exist.", storageException);
- throw new AzureException(storageException);
- } else {
- LOG.error("Error acquiring lease!", storageException);
- throw new AzureException(storageException);
- }
- }
- return null;
- }
-
- /**
- * Renews the lease on the blob.
- * @param leaseId ID of the lease to be renewed.
- * @return True if lease was renewed successfully, false otherwise.
- */
- public boolean renewLease(String leaseId) {
- try {
- leaseBlob.renewLease(AccessCondition.generateLeaseCondition(leaseId));
- return true;
- } catch (StorageException storageException) {
- LOG.error("Wasn't able to renew lease with lease id: " + leaseId, storageException);
- return false;
- }
- }
-
- /**
- * Releases the lease on the blob.
- * @param leaseId ID of the lease to be released.
- * @return True if released successfully, false otherwise.
- */
- public boolean releaseLease(String leaseId) {
- try {
- leaseBlob.releaseLease(AccessCondition.generateLeaseCondition(leaseId));
- return true;
- } catch (StorageException storageException) {
- LOG.error("Wasn't able to release lease with lease id: " + leaseId, storageException);
- return false;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/ProcessorEntity.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/ProcessorEntity.java b/samza-azure/src/main/java/org/apache/samza/ProcessorEntity.java
deleted file mode 100644
index 5145821..0000000
--- a/samza-azure/src/main/java/org/apache/samza/ProcessorEntity.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza;
-
-import com.microsoft.azure.storage.table.TableServiceEntity;
-
-
-/**
- * Table schema for Azure processor table.
- * Denotes a row in the table with PARTITION KEY = Job Model Version and ROW KEY = Processor ID
- * Other fields include integer liveness value to which each processor heartbeats,
- * and boolean isLeader value which denotes whether the processor is a leader or not.
- */
-public class ProcessorEntity extends TableServiceEntity {
- private int liveness;
- private boolean isLeader;
-
- public ProcessorEntity() {}
-
- public ProcessorEntity(String jobModelVersion, String processorId) {
- this.partitionKey = jobModelVersion;
- this.rowKey = processorId;
- this.isLeader = false;
- }
-
- /**
- * Updates heartbeat by updating the liveness value in the table.
- * @param value
- */
- public void setLiveness(int value) {
- liveness = value;
- }
-
- public void setIsLeader(boolean leader) {
- isLeader = leader;
- }
-
- public boolean getIsLeader() {
- return isLeader;
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/TableUtils.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/TableUtils.java b/samza-azure/src/main/java/org/apache/samza/TableUtils.java
deleted file mode 100644
index e49fd90..0000000
--- a/samza-azure/src/main/java/org/apache/samza/TableUtils.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza;
-
-import com.microsoft.azure.storage.StorageException;
-import com.microsoft.azure.storage.table.CloudTable;
-import com.microsoft.azure.storage.table.CloudTableClient;
-import com.microsoft.azure.storage.table.TableOperation;
-import com.microsoft.azure.storage.table.TableQuery;
-import java.net.URISyntaxException;
-import java.util.HashSet;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Client side class that has a reference to Azure Table Storage.
- * Enables the user to add or delete information from the table, make updates to the table and retrieve information from the table.
- * Every row in a table is uniquely identified by a combination of the PARTIITON KEY and ROW KEY.
- * PARTITION KEY = Group ID = Job Model Version (for this case).
- * ROW KEY = Unique entity ID for a group = Processor ID (for this case).
- */
-public class TableUtils {
-
- private static final Logger LOG = LoggerFactory.getLogger(TableUtils.class);
- private static final String PARTITION_KEY = "PartitionKey";
- private static final long CHECK_LIVENESS_DELAY = 30;
- private static final String INITIAL_STATE = "unassigned";
- private CloudTableClient tableClient;
- private CloudTable table;
-
- public TableUtils(AzureClient client, String tableName) {
- tableClient = client.getTableClient();
- try {
- table = tableClient.getTableReference(tableName);
- table.createIfNotExists();
- } catch (URISyntaxException e) {
- LOG.error("\nConnection string specifies an invalid URI.", new SamzaException(e));
- } catch (StorageException e) {
- LOG.error("Azure storage exception.", new SamzaException(e));
- }
- }
-
- /**
- * Add a row which denotes an active processor to the processor table.
- * @param jmVersion Job model version that the processor is operating on.
- * @param pid Unique processor ID.
- * @param liveness Random heartbeat value.
- * @param isLeader Denotes whether the processor is a leader or not.
- * @throws AzureException If an Azure storage service error occurred.
- */
- public void addProcessorEntity(String jmVersion, String pid, int liveness, boolean isLeader) {
- ProcessorEntity entity = new ProcessorEntity(jmVersion, pid);
- entity.setIsLeader(isLeader);
- entity.setLiveness(liveness);
- TableOperation add = TableOperation.insert(entity);
- try {
- table.execute(add);
- } catch (StorageException e) {
- LOG.error("Azure storage exception while adding processor entity with job model version: " + jmVersion + "and pid: " + pid, e);
- throw new AzureException(e);
- }
- }
-
- /**
- * Retrieve a particular row in the processor table, given the partition key and the row key.
- * @param jmVersion Job model version of the processor row to be retrieved.
- * @param pid Unique processor ID of the processor row to be retrieved.
- * @return An instance of required processor entity. Null if does not exist.
- * @throws AzureException If an Azure storage service error occurred.
- */
- public ProcessorEntity getEntity(String jmVersion, String pid) {
- try {
- TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class);
- ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType();
- return entity;
- } catch (StorageException e) {
- LOG.error("Azure storage exception while retrieving processor entity with job model version: " + jmVersion + "and pid: " + pid, e);
- throw new AzureException(e);
- }
- }
-
- /**
- * Updates the liveness value of a particular processor with a randomly generated integer, which in turn updates the last modified since timestamp of the row.
- * @param jmVersion Job model version of the processor row to be updated.
- * @param pid Unique processor ID of the processor row to be updated.
- */
- public void updateHeartbeat(String jmVersion, String pid) {
- try {
- Random rand = new Random();
- int value = rand.nextInt(10000) + 2;
- TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class);
- ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType();
- entity.setLiveness(value);
- TableOperation update = TableOperation.replace(entity);
- table.execute(update);
- } catch (StorageException e) {
- LOG.error("Azure storage exception while updating heartbeat for job model version: " + jmVersion + "and pid: " + pid, e);
- }
- }
-
- /**
- * Updates the isLeader value when the processor starts or stops being a leader.
- * @param jmVersion Job model version of the processor row to be updated.
- * @param pid Unique processor ID of the processor row to be updated.
- * @param isLeader Denotes whether the processor is a leader or not.
- * @throws AzureException If an Azure storage service error occurred.
- */
- public void updateIsLeader(String jmVersion, String pid, boolean isLeader) {
- try {
- TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class);
- ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType();
- entity.setIsLeader(isLeader);
- TableOperation update = TableOperation.replace(entity);
- table.execute(update);
- } catch (StorageException e) {
- LOG.error("Azure storage exception while updating isLeader value for job model version: " + jmVersion + "and pid: " + pid, e);
- throw new AzureException(e);
- }
- }
-
- /**
- * Deletes a specified row in the processor table.
- * @param jmVersion Job model version of the processor row to be deleted.
- * @param pid Unique processor ID of the processor row to be deleted.
- * @throws AzureException If an Azure storage service error occurred.
- */
- public void deleteProcessorEntity(String jmVersion, String pid) {
- try {
- TableOperation retrieveEntity = TableOperation.retrieve(jmVersion, pid, ProcessorEntity.class);
- ProcessorEntity entity = table.execute(retrieveEntity).getResultAsType();
- TableOperation remove = TableOperation.delete(entity);
- table.execute(remove);
- } catch (StorageException e) {
- LOG.error("Azure storage exception while deleting processor entity with job model version: " + jmVersion + "and pid: " + pid, e);
- throw new AzureException(e);
- }
- }
-
- /**
- * Retrieve all rows in a table with the given partition key.
- * @param partitionKey Job model version of the processors to be retrieved.
- * @return Iterable list of processor entities.
- */
- public Iterable<ProcessorEntity> getEntitiesWithPartition(String partitionKey) {
- String partitionFilter = TableQuery.generateFilterCondition(PARTITION_KEY, TableQuery.QueryComparisons.EQUAL, partitionKey);
- TableQuery<ProcessorEntity> partitionQuery = TableQuery.from(ProcessorEntity.class).where(partitionFilter);
- return table.execute(partitionQuery);
- }
-
- /**
- * Gets the list of all active processors that are heartbeating to the processor table.
- * @param currentJMVersion Current job model version that the processors in the application are operating on.
- * @return List of ids of currently active processors in the application, retrieved from the processor table.
- */
- public Set<String> getActiveProcessorsList(AtomicReference<String> currentJMVersion) {
- Iterable<ProcessorEntity> tableList = getEntitiesWithPartition(currentJMVersion.get());
- Set<String> activeProcessorsList = new HashSet<>();
- for (ProcessorEntity entity: tableList) {
- if (System.currentTimeMillis() - entity.getTimestamp().getTime() <= CHECK_LIVENESS_DELAY * 1000) {
- activeProcessorsList.add(entity.getRowKey());
- }
- }
-
- Iterable<ProcessorEntity> unassignedList = getEntitiesWithPartition(INITIAL_STATE);
- for (ProcessorEntity entity: unassignedList) {
- if (System.currentTimeMillis() - entity.getTimestamp().getTime() <= CHECK_LIVENESS_DELAY * 1000) {
- activeProcessorsList.add(entity.getRowKey());
- }
- }
- return activeProcessorsList;
- }
-
- public CloudTable getTable() {
- return table;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/config/AzureConfig.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/config/AzureConfig.java b/samza-azure/src/main/java/org/apache/samza/config/AzureConfig.java
new file mode 100644
index 0000000..dc96d2d
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/config/AzureConfig.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config;
+
+/**
+ * Config class for reading all user defined parameters for Azure driven coordination services.
+ */
+public class AzureConfig extends MapConfig {
+
+ // Connection string for Azure Storage Account, format: "DefaultEndpointsProtocol=<https>;AccountName=<>;AccountKey=<>"
+ public static final String AZURE_STORAGE_CONNECT = "azure.storage.connect";
+ public static final String AZURE_PAGEBLOB_LENGTH = "job.coordinator.azure.blob.length";
+ public static final long DEFAULT_AZURE_PAGEBLOB_LENGTH = 5120000;
+
+ private static String containerName;
+ private static String blobName;
+ private static String tableName;
+
+ public AzureConfig(Config config) {
+ super(config);
+ ApplicationConfig appConfig = new ApplicationConfig(config);
+ //Remove all non-alphanumeric characters from id as table name does not allow them.
+ String id = appConfig.getGlobalAppId().replaceAll("[^A-Za-z0-9]", "");
+ containerName = "samzacontainer" + id;
+ blobName = "samzablob" + id;
+ tableName = "samzatable" + id;
+ }
+
+ public String getAzureConnect() {
+ if (!containsKey(AZURE_STORAGE_CONNECT)) {
+ throw new ConfigException("Missing " + AZURE_STORAGE_CONNECT + " config!");
+ }
+ return get(AZURE_STORAGE_CONNECT);
+ }
+
+ public String getAzureContainerName() {
+ return containerName;
+ }
+
+ public String getAzureBlobName() {
+ return blobName;
+ }
+
+ public long getAzureBlobLength() {
+ return getLong(AZURE_PAGEBLOB_LENGTH, DEFAULT_AZURE_PAGEBLOB_LENGTH);
+ }
+
+ public String getAzureTableName() {
+ return tableName;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
new file mode 100644
index 0000000..9438690
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureJobCoordinator.java
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.AzureClient;
+import org.apache.samza.config.AzureConfig;
+import org.apache.samza.coordinator.data.BarrierState;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouper;
+import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.runtime.ProcessorIdGenerator;
+import org.apache.samza.coordinator.scheduler.HeartbeatScheduler;
+import org.apache.samza.coordinator.scheduler.JMVersionUpgradeScheduler;
+import org.apache.samza.coordinator.scheduler.LeaderBarrierCompleteScheduler;
+import org.apache.samza.coordinator.scheduler.LeaderLivenessCheckScheduler;
+import org.apache.samza.coordinator.scheduler.LivenessCheckScheduler;
+import org.apache.samza.coordinator.scheduler.RenewLeaseScheduler;
+import org.apache.samza.coordinator.scheduler.SchedulerStateChangeListener;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.BlobUtils;
+import org.apache.samza.util.ClassLoaderHelper;
+import org.apache.samza.util.LeaseBlobManager;
+import org.apache.samza.util.TableUtils;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+
+
+/**
+ * Class that provides coordination mechanism for Samza standalone in Azure.
+ * Handles processor lifecycle through Azure blob and table storage. Orchestrates leader election.
+ * The leader job coordinator generates partition mapping, writes shared data to the blob and manages rebalancing.
+ */
+public class AzureJobCoordinator implements JobCoordinator {
+ private static final Logger LOG = LoggerFactory.getLogger(AzureJobCoordinator.class);
+ private static final int METADATA_CACHE_TTL_MS = 5000;
+ private static final String INITIAL_STATE = "UNASSIGNED";
+ private final Consumer<String> errorHandler;
+ private final AzureLeaderElector azureLeaderElector;
+ private final BlobUtils leaderBlob;
+ private final TableUtils table;
+ private final Config config;
+ private final String processorId;
+ private final AzureClient client;
+ private final AtomicReference<String> currentJMVersion;
+ private final AtomicBoolean versionUpgradeDetected;
+ private final HeartbeatScheduler heartbeat;
+ private final JMVersionUpgradeScheduler versionUpgrade;
+ private final LeaderLivenessCheckScheduler leaderAlive;
+ private LivenessCheckScheduler liveness;
+ private RenewLeaseScheduler renewLease;
+ private LeaderBarrierCompleteScheduler leaderBarrierScheduler;
+ private StreamMetadataCache streamMetadataCache = null;
+ private JobCoordinatorListener coordinatorListener = null;
+ private JobModel jobModel = null;
+
+ /**
+ * Creates an instance of Azure job coordinator, along with references to Azure leader elector, Azure Blob and Azure Table.
+ * @param config User defined config
+ */
+ public AzureJobCoordinator(Config config) {
+ //TODO: Cleanup previous values in the table when barrier times out.
+ this.config = config;
+ processorId = createProcessorId(config);
+ currentJMVersion = new AtomicReference<>(INITIAL_STATE);
+ AzureConfig azureConfig = new AzureConfig(config);
+ client = new AzureClient(azureConfig.getAzureConnect());
+ leaderBlob = new BlobUtils(client, azureConfig.getAzureContainerName(), azureConfig.getAzureBlobName(), azureConfig.getAzureBlobLength());
+ errorHandler = (errorMsg) -> {
+ LOG.error(errorMsg);
+ stop();
+ };
+ table = new TableUtils(client, azureConfig.getAzureTableName(), INITIAL_STATE);
+ azureLeaderElector = new AzureLeaderElector(new LeaseBlobManager(leaderBlob.getBlob()));
+ azureLeaderElector.setLeaderElectorListener(new AzureLeaderElectorListener());
+ versionUpgradeDetected = new AtomicBoolean(false);
+ heartbeat = new HeartbeatScheduler(errorHandler, table, currentJMVersion, processorId);
+ versionUpgrade = new JMVersionUpgradeScheduler(errorHandler, leaderBlob, currentJMVersion, versionUpgradeDetected, processorId);
+ leaderAlive = new LeaderLivenessCheckScheduler(errorHandler, table, leaderBlob, currentJMVersion, INITIAL_STATE);
+ leaderBarrierScheduler = null;
+ renewLease = null;
+ liveness = null;
+ }
+
+ @Override
+ public void start() {
+
+ LOG.info("Starting Azure job coordinator.");
+ streamMetadataCache = StreamMetadataCache.apply(METADATA_CACHE_TTL_MS, config);
+ table.addProcessorEntity(INITIAL_STATE, processorId, false);
+
+ // Start scheduler for heartbeating
+ LOG.info("Starting scheduler for heartbeating.");
+ heartbeat.scheduleTask();
+
+ azureLeaderElector.tryBecomeLeader();
+
+ // Start scheduler to check for job model version upgrades
+ LOG.info("Starting scheduler to check for job model version upgrades.");
+ versionUpgrade.setStateChangeListener(createJMVersionUpgradeListener());
+ versionUpgrade.scheduleTask();
+
+ // Start scheduler to check for leader liveness
+ LOG.info("Starting scheduler to check for leader liveness.");
+ leaderAlive.setStateChangeListener(createLeaderLivenessListener());
+ leaderAlive.scheduleTask();
+ }
+
+ @Override
+ public void stop() {
+ LOG.info("Shutting down Azure job coordinator.");
+
+ if (coordinatorListener != null) {
+ coordinatorListener.onJobModelExpired();
+ }
+
+ // Resign leadership
+ if (azureLeaderElector.amILeader()) {
+ azureLeaderElector.resignLeadership();
+ }
+
+ // Shutdown all schedulers
+ shutdownSchedulers();
+
+ if (coordinatorListener != null) {
+ coordinatorListener.onCoordinatorStop();
+ }
+ }
+
+ @Override
+ public String getProcessorId() {
+ return processorId;
+ }
+
+ @Override
+ public void setListener(JobCoordinatorListener listener) {
+ this.coordinatorListener = listener;
+ }
+
+ @Override
+ public JobModel getJobModel() {
+ return jobModel;
+ }
+
+ private void shutdownSchedulers() {
+ if (renewLease != null) {
+ renewLease.shutdown();
+ }
+ if (leaderBarrierScheduler != null) {
+ leaderBarrierScheduler.shutdown();
+ }
+ if (liveness != null) {
+ liveness.shutdown();
+ }
+ heartbeat.shutdown();
+ leaderAlive.shutdown();
+ versionUpgrade.shutdown();
+ }
+
+ /**
+ * Creates a listener for LeaderBarrierCompleteScheduler class.
+ * Invoked by the leader when it detects that rebalancing has completed by polling the processor table.
+ * Updates the barrier state on the blob to denote that the barrier has completed.
+ * Cancels all future tasks scheduled by the LeaderBarrierComplete scheduler to check if barrier has completed.
+ * @return an instance of SchedulerStateChangeListener.
+ */
+ private SchedulerStateChangeListener createLeaderBarrierCompleteListener(String nextJMVersion, AtomicBoolean barrierTimeout) {
+ return () -> {
+ versionUpgradeDetected.getAndSet(false);
+ String state;
+ if (barrierTimeout.get()) {
+ LOG.error("Barrier timed out for version {}", nextJMVersion);
+ state = BarrierState.TIMEOUT.name() + " " + nextJMVersion;
+ } else {
+ LOG.info("Leader detected barrier completion.");
+ state = BarrierState.END.name() + " " + nextJMVersion;
+ }
+ if (!leaderBlob.publishBarrierState(state, azureLeaderElector.getLeaseId().get())) {
+ LOG.info("Leader failed to publish the job model {}. Stopping the processor with PID: .", jobModel, processorId);
+ stop();
+ table.deleteProcessorEntity(currentJMVersion.get(), processorId);
+ }
+ leaderBarrierScheduler.shutdown();
+ };
+ }
+
+ /**
+ * Creates a listener for LivenessCheckScheduler class.
+ * Invoked by the leader when the list of active processors in the system changes.
+ * @return an instance of SchedulerStateChangeListener.
+ */
+ private SchedulerStateChangeListener createLivenessListener(AtomicReference<List<String>> liveProcessors) {
+ return () -> {
+ LOG.info("Leader detected change in list of live processors.");
+ doOnProcessorChange(liveProcessors.get());
+ };
+ }
+
+ /**
+ * Creates a listener for JMVersionUpgradeScheduler class.
+ * Invoked when the processor detects a job model version upgrade on the blob.
+ * Stops listening for job model version upgrades until rebalancing achieved.
+ * @return an instance of SchedulerStateChangeListener.
+ */
+ private SchedulerStateChangeListener createJMVersionUpgradeListener() {
+ return () -> {
+ LOG.info("Job model version upgrade detected.");
+ versionUpgradeDetected.getAndSet(true);
+ onNewJobModelAvailable(leaderBlob.getJobModelVersion());
+ };
+ }
+
+ /**
+ * Creates a listener for LeaderLivenessCheckScheduler class.
+ * Invoked when an existing leader dies. Enables the JC to participate in leader election again.
+ * @return an instance of SchedulerStateChangeListener.
+ */
+ private SchedulerStateChangeListener createLeaderLivenessListener() {
+ return () -> {
+ LOG.info("Existing leader died.");
+ azureLeaderElector.tryBecomeLeader();
+ };
+ }
+
+ /**
+ * For each input stream specified in config, exactly determine its
+ * partitions, returning a set of SystemStreamPartitions containing them all.
+ */
+ private Set<SystemStreamPartition> getInputStreamPartitions() {
+ TaskConfig taskConfig = new TaskConfig(config);
+ scala.collection.immutable.Set<SystemStream> inputSystemStreams = taskConfig.getInputStreams();
+
+ // Get the set of partitions for each SystemStream from the stream metadata
+ Set<SystemStreamPartition>
+ sspSet = JavaConverters.mapAsJavaMapConverter(streamMetadataCache.getStreamMetadata(inputSystemStreams, true)).asJava()
+ .entrySet()
+ .stream()
+ .flatMap(this::mapSSMToSSP)
+ .collect(Collectors.toSet());
+
+ return sspSet;
+ }
+
+ private Stream<SystemStreamPartition> mapSSMToSSP(Map.Entry<SystemStream, SystemStreamMetadata> ssMs) {
+ return ssMs.getValue()
+ .getSystemStreamPartitionMetadata()
+ .keySet()
+ .stream()
+ .map(partition -> new SystemStreamPartition(ssMs.getKey(), partition));
+ }
+
+ /**
+ * Gets a SystemStreamPartitionGrouper object from the configuration.
+ */
+ private SystemStreamPartitionGrouper getSystemStreamPartitionGrouper() {
+ JobConfig jobConfig = new JobConfig(config);
+ String factoryString = jobConfig.getSystemStreamPartitionGrouperFactory();
+ SystemStreamPartitionGrouper grouper = Util.<SystemStreamPartitionGrouperFactory>getObj(factoryString).getSystemStreamPartitionGrouper(jobConfig);
+ return grouper;
+ }
+
+ private int getMaxNumTasks() {
+ // Do grouping to fetch TaskName to SSP mapping
+ Set<SystemStreamPartition> allSystemStreamPartitions = getInputStreamPartitions();
+ SystemStreamPartitionGrouper grouper = getSystemStreamPartitionGrouper();
+ Map<TaskName, Set<SystemStreamPartition>> groups = grouper.group(allSystemStreamPartitions);
+ LOG.info("SystemStreamPartitionGrouper " + grouper.toString() + " has grouped the SystemStreamPartitions into " + Integer.toString(groups.size()) +
+ " tasks with the following taskNames: {}", groups.keySet());
+ return groups.size();
+ }
+
+ /**
+ * Called only by the leader, either when the processor becomes the leader, or when the list of live processors changes.
+ * @param currentProcessorIds New updated list of processor IDs which caused the rebalancing.
+ */
+ private void doOnProcessorChange(List<String> currentProcessorIds) {
+ // if list of processors is empty - it means we are called from 'onBecomeLeader'
+
+ // Check if number of processors is greater than number of tasks
+ List<String> initialProcessorIds = new ArrayList<>(currentProcessorIds);
+ int numTasks = getMaxNumTasks();
+ if (currentProcessorIds.size() > numTasks) {
+ int iterator = 0;
+ while (currentProcessorIds.size() != numTasks) {
+ if (!currentProcessorIds.get(iterator).equals(processorId)) {
+ currentProcessorIds.remove(iterator);
+ iterator++;
+ }
+ }
+ }
+ LOG.info("currentProcessorIds = {}", currentProcessorIds);
+ LOG.info("initialProcessorIds = {}", initialProcessorIds);
+
+ String nextJMVersion;
+ String prevJMVersion = currentJMVersion.get();
+ JobModel prevJobModel = jobModel;
+ AtomicBoolean barrierTimeout = new AtomicBoolean(false);
+
+ if (currentProcessorIds.isEmpty()) {
+ if (currentJMVersion.get().equals(INITIAL_STATE)) {
+ nextJMVersion = "1";
+ } else {
+ nextJMVersion = Integer.toString(Integer.valueOf(prevJMVersion) + 1);
+ }
+ currentProcessorIds = new ArrayList<>(table.getActiveProcessorsList(currentJMVersion));
+ initialProcessorIds = currentProcessorIds;
+ } else {
+ //Check if previous barrier not reached, then previous barrier times out.
+ String blobJMV = leaderBlob.getJobModelVersion();
+ nextJMVersion = Integer.toString(Integer.valueOf(prevJMVersion) + 1);
+ if (blobJMV != null && Integer.valueOf(blobJMV) > Integer.valueOf(prevJMVersion)) {
+ prevJMVersion = blobJMV;
+ prevJobModel = leaderBlob.getJobModel();
+ nextJMVersion = Integer.toString(Integer.valueOf(blobJMV) + 1);
+ versionUpgradeDetected.getAndSet(false);
+ leaderBarrierScheduler.shutdown();
+ leaderBlob.publishBarrierState(BarrierState.TIMEOUT.name() + " " + blobJMV, azureLeaderElector.getLeaseId().get());
+ }
+ }
+
+ // Generate the new JobModel
+ JobModel newJobModel = JobModelManager.readJobModel(this.config, Collections.emptyMap(),
+ null, streamMetadataCache, currentProcessorIds);
+ LOG.info("pid=" + processorId + "Generated new Job Model. Version = " + nextJMVersion);
+
+ // Publish the new job model
+ boolean jmWrite = leaderBlob.publishJobModel(prevJobModel, newJobModel, prevJMVersion, nextJMVersion, azureLeaderElector.getLeaseId().get());
+ // Publish barrier state
+ boolean barrierWrite = leaderBlob.publishBarrierState(BarrierState.START.name() + " " + nextJMVersion, azureLeaderElector.getLeaseId().get());
+ barrierTimeout.set(false);
+ // Publish list of processors this function was called with
+ boolean processorWrite = leaderBlob.publishLiveProcessorList(initialProcessorIds, azureLeaderElector.getLeaseId().get());
+
+ //Shut down processor if write fails even after retries. These writes have an inherent retry policy.
+ if (!jmWrite || !barrierWrite || !processorWrite) {
+ LOG.info("Leader failed to publish the job model {}. Stopping the processor with PID: .", jobModel, processorId);
+ stop();
+ table.deleteProcessorEntity(currentJMVersion.get(), processorId);
+ }
+
+ LOG.info("pid=" + processorId + "Published new Job Model. Version = " + nextJMVersion);
+
+ // Start scheduler to check if barrier reached
+ long startTime = System.currentTimeMillis();
+ leaderBarrierScheduler = new LeaderBarrierCompleteScheduler(errorHandler, table, nextJMVersion, initialProcessorIds, startTime, barrierTimeout, currentJMVersion, processorId);
+ leaderBarrierScheduler.setStateChangeListener(createLeaderBarrierCompleteListener(nextJMVersion, barrierTimeout));
+ leaderBarrierScheduler.scheduleTask();
+ }
+
+ /**
+ * Called when the JC detects a job model version upgrade on the shared blob.
+ * @param nextJMVersion The new job model version after rebalancing.
+ */
+ private void onNewJobModelAvailable(final String nextJMVersion) {
+ LOG.info("pid=" + processorId + "new JobModel available with job model version {}", nextJMVersion);
+
+ //Get the new job model from blob
+ jobModel = leaderBlob.getJobModel();
+ LOG.info("pid=" + processorId + ": new JobModel available. ver=" + nextJMVersion + "; jm = " + jobModel);
+
+ if (!jobModel.getContainers().containsKey(processorId)) {
+ LOG.info("JobModel: {} does not contain the processorId: {}. Stopping the processor.", jobModel, processorId);
+ stop();
+ table.deleteProcessorEntity(currentJMVersion.get(), processorId);
+ } else {
+ //Stop current work
+ if (coordinatorListener != null) {
+ coordinatorListener.onJobModelExpired();
+ }
+ // Add entry with new job model version to the processor table
+ table.addProcessorEntity(nextJMVersion, processorId, azureLeaderElector.amILeader());
+
+ // Start polling blob to check if barrier reached
+ Random random = new Random();
+ String blobBarrierState = leaderBlob.getBarrierState();
+ while (true) {
+ if (blobBarrierState.equals(BarrierState.END.name() + " " + nextJMVersion)) {
+ LOG.info("Barrier completion detected by the worker for barrier version {}.", nextJMVersion);
+ versionUpgradeDetected.getAndSet(false);
+ onNewJobModelConfirmed(nextJMVersion);
+ break;
+ } else if (blobBarrierState.equals(BarrierState.TIMEOUT.name() + " " + nextJMVersion) ||
+ (Integer.valueOf(leaderBlob.getJobModelVersion()) > Integer.valueOf(nextJMVersion))) {
+ LOG.info("Barrier timed out for version number {}", nextJMVersion);
+ versionUpgradeDetected.getAndSet(false);
+ break;
+ } else {
+ try {
+ Thread.sleep(random.nextInt(5000));
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ }
+ LOG.info("Checking for barrier state on the blob again...");
+ blobBarrierState = leaderBlob.getBarrierState();
+ }
+ }
+ }
+ }
+
+ /**
+ * Called when the JC detects that the barrier has completed by checking the barrier state on the blob.
+ * @param nextJMVersion The new job model version after rebalancing.
+ */
+ private void onNewJobModelConfirmed(final String nextJMVersion) {
+ LOG.info("pid=" + processorId + "new version " + nextJMVersion + " of the job model got confirmed");
+
+ // Delete previous value
+ if (table.getEntity(currentJMVersion.get(), processorId) != null) {
+ table.deleteProcessorEntity(currentJMVersion.get(), processorId);
+ }
+ if (table.getEntity(INITIAL_STATE, processorId) != null) {
+ table.deleteProcessorEntity(INITIAL_STATE, processorId);
+ }
+
+ //Start heartbeating to new entry only when barrier reached.
+ //Changing the current job model version enables that since we are heartbeating to a row identified by the current job model version.
+ currentJMVersion.getAndSet(nextJMVersion);
+
+ //Start the container with the new model
+ if (coordinatorListener != null) {
+ coordinatorListener.onNewJobModel(processorId, jobModel);
+ }
+ }
+
+ private String createProcessorId(Config config) {
+ // TODO: This check to be removed after 0.13+
+ ApplicationConfig appConfig = new ApplicationConfig(config);
+ if (appConfig.getProcessorId() != null) {
+ return appConfig.getProcessorId();
+ } else if (StringUtils.isNotBlank(appConfig.getAppProcessorIdGeneratorClass())) {
+ ProcessorIdGenerator idGenerator =
+ ClassLoaderHelper.fromClassName(appConfig.getAppProcessorIdGeneratorClass(), ProcessorIdGenerator.class);
+ return idGenerator.generateProcessorId(config);
+ } else {
+ throw new ConfigException(String
+ .format("Expected either %s or %s to be configured", ApplicationConfig.PROCESSOR_ID,
+ ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS));
+ }
+ }
+
+ public class AzureLeaderElectorListener implements LeaderElectorListener {
+ /**
+ * Keep renewing the lease and do the required tasks as a leader.
+ */
+ @Override
+ public void onBecomingLeader() {
+ // Update table to denote that it is a leader.
+ table.updateIsLeader(currentJMVersion.get(), processorId, true);
+
+ // Schedule a task to renew the lease after a fixed time interval
+ LOG.info("Starting scheduler to keep renewing lease held by the leader.");
+ renewLease = new RenewLeaseScheduler((errorMsg) -> {
+ LOG.error(errorMsg);
+ table.updateIsLeader(currentJMVersion.get(), processorId, false);
+ azureLeaderElector.resignLeadership();
+ renewLease.shutdown();
+ liveness.shutdown();
+ }, azureLeaderElector.getLeaseBlobManager(), azureLeaderElector.getLeaseId());
+ renewLease.scheduleTask();
+
+ doOnProcessorChange(new ArrayList<>());
+
+ // Start scheduler to check for change in list of live processors
+ LOG.info("Starting scheduler to check for change in list of live processors in the system.");
+ liveness = new LivenessCheckScheduler(errorHandler, table, leaderBlob, currentJMVersion, processorId);
+ liveness.setStateChangeListener(createLivenessListener(liveness.getLiveProcessors()));
+ liveness.scheduleTask();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLeaderElector.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLeaderElector.java
new file mode 100644
index 0000000..c93f1d0
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLeaderElector.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.samza.AzureException;
+import org.apache.samza.util.LeaseBlobManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Class to facilitate leader election in Azure.
+ * The processor that acquires the lease on the blob becomes the leader.
+ * The lease ID is null initially. It is generated by Azure when the processor acquires the lease, and updated accordingly.
+ * Every processor requires a valid active lease ID in order to perform successful write and delete operations on the blob.
+ * Read operations from the blob are not dependent on the lease ID.
+ */
+public class AzureLeaderElector implements LeaderElector {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AzureLeaderElector.class);
+ private static final int LEASE_TIME_IN_SEC = 60;
+ private final LeaseBlobManager leaseBlobManager;
+ private LeaderElectorListener leaderElectorListener = null;
+ private final AtomicReference<String> leaseId;
+ private final AtomicBoolean isLeader;
+
+ public AzureLeaderElector(LeaseBlobManager leaseBlobManager) {
+ this.isLeader = new AtomicBoolean(false);
+ this.leaseBlobManager = leaseBlobManager;
+ this.leaseId = new AtomicReference<>(null);
+ }
+
+ @Override
+ public void setLeaderElectorListener(LeaderElectorListener listener) {
+ this.leaderElectorListener = listener;
+ }
+
+ /**
+ * Tries to become the leader by acquiring a lease on the blob.
+ * The acquireLease operation has a retry policy where upon failure, the operation is tried 3 times at 5 second intervals.
+ * Invokes the listener on becoming the leader.
+ * @throws AzureException If a Azure storage service error occurred. This includes the case where the blob you're trying to lease does not exist.
+ */
+ @Override
+ public void tryBecomeLeader() throws AzureException {
+ leaseId.getAndSet(leaseBlobManager.acquireLease(LEASE_TIME_IN_SEC, leaseId.get()));
+ if (leaseId.get() != null) {
+ LOG.info("Became leader with lease ID {}.", leaseId.get());
+ isLeader.set(true);
+ if (leaderElectorListener != null) {
+ leaderElectorListener.onBecomingLeader();
+ }
+ } else {
+ LOG.info("Unable to become the leader. Continuing as a worker.");
+ }
+ }
+
+ /**
+ * Releases the lease in order to resign leadership. It also stops all schedulers scheduled by the leader.
+ * The releaseLease operation has a retry policy where upon failure, the operation is tried 3 times at 5 second intervals.
+ */
+ @Override
+ public void resignLeadership() {
+ if (isLeader.get()) {
+ leaseBlobManager.releaseLease(leaseId.get());
+ isLeader.set(false);
+ LOG.info("Resigning leadership with lease ID {}", leaseId.get());
+ leaseId.getAndSet(null);
+ } else {
+ LOG.info("Can't release the lease because it is not the leader and does not hold an active lease.");
+ }
+ }
+
+ /**
+ * Checks whether it's a leader
+ * @return true if it is the leader, false otherwise
+ */
+ @Override
+ public boolean amILeader() {
+ return isLeader.get();
+ }
+
+ public AtomicReference<String> getLeaseId() {
+ return leaseId;
+ }
+
+ public LeaseBlobManager getLeaseBlobManager() {
+ return this.leaseBlobManager;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/data/BarrierState.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/data/BarrierState.java b/samza-azure/src/main/java/org/apache/samza/coordinator/data/BarrierState.java
new file mode 100644
index 0000000..1c144de
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/data/BarrierState.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.data;
+
+/**
+ * Enum depicting different barrier states.
+ */
+public enum BarrierState {
+ START, END, TIMEOUT
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/data/JobModelBundle.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/data/JobModelBundle.java b/samza-azure/src/main/java/org/apache/samza/coordinator/data/JobModelBundle.java
new file mode 100644
index 0000000..d05e0a5
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/data/JobModelBundle.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.data;
+
+import org.apache.samza.job.model.JobModel;
+
+
+/**
+ * Bundle class for current and previous - job model and job model version.
+ * Used for publishing updated data to the blob in one go.
+ */
+public class JobModelBundle {
+
+ private JobModel prevJobModel;
+ private JobModel currJobModel;
+ private String prevJobModelVersion;
+ private String currJobModelVersion;
+
+ public JobModelBundle() {}
+
+ public JobModelBundle(JobModel prevJM, JobModel currJM, String prevJMV, String currJMV) {
+ prevJobModel = prevJM;
+ currJobModel = currJM;
+ prevJobModelVersion = prevJMV;
+ currJobModelVersion = currJMV;
+ }
+
+ public JobModel getCurrJobModel() {
+ return currJobModel;
+ }
+
+ public JobModel getPrevJobModel() {
+ return prevJobModel;
+ }
+
+ public String getCurrJobModelVersion() {
+ return currJobModelVersion;
+ }
+
+ public String getPrevJobModelVersion() {
+ return prevJobModelVersion;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/data/ProcessorEntity.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/data/ProcessorEntity.java b/samza-azure/src/main/java/org/apache/samza/coordinator/data/ProcessorEntity.java
new file mode 100644
index 0000000..9323bde
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/data/ProcessorEntity.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.data;
+
+import com.microsoft.azure.storage.table.TableServiceEntity;
+import java.util.Random;
+
+
+/**
+ * Table schema for Azure processor table.
+ * Denotes a row in the table with PARTITION KEY = Job Model Version and ROW KEY = Processor ID
+ * Other fields include integer liveness value to which each processor heartbeats,
+ * and boolean isLeader value which denotes whether the processor is a leader or not.
+ */
+public class ProcessorEntity extends TableServiceEntity {
+ private Random rand = new Random();
+ private int liveness;
+ private boolean isLeader;
+
+ public ProcessorEntity() {}
+
+ public ProcessorEntity(String jobModelVersion, String processorId) {
+ this.partitionKey = jobModelVersion;
+ this.rowKey = processorId;
+ this.isLeader = false;
+ this.liveness = rand.nextInt(10000) + 2;
+ }
+
+ /**
+ * Updates heartbeat by updating the liveness value in the table.
+ * Sets the liveness field to a random integer value in order to update the last modified since timestamp of the row in the table.
+ * This asserts to the leader that the processor is alive.
+ */
+ public void updateLiveness() {
+ liveness = rand.nextInt(10000) + 2;
+ }
+
+ public void setIsLeader(boolean leader) {
+ isLeader = leader;
+ }
+
+ public boolean getIsLeader() {
+ return isLeader;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/c3b447ec/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/HeartbeatScheduler.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/HeartbeatScheduler.java b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/HeartbeatScheduler.java
new file mode 100644
index 0000000..2abb380
--- /dev/null
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/scheduler/HeartbeatScheduler.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator.scheduler;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.samza.util.TableUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Scheduler invoked by each processor for heartbeating to a row of the table.
+ * Heartbeats every 5 seconds.
+ * The row is determined by the job model version and processor id passed to the scheduler.
+ * All time units are in SECONDS.
+ */
+public class HeartbeatScheduler implements TaskScheduler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HeartbeatScheduler.class);
+ private static final long HEARTBEAT_DELAY_SEC = 5;
+ private static final ThreadFactory PROCESSOR_THREAD_FACTORY =
+ new ThreadFactoryBuilder().setNameFormat("HeartbeatScheduler-%d").build();
+ private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(PROCESSOR_THREAD_FACTORY);
+ private final String processorId;
+ private final TableUtils table;
+ private final AtomicReference<String> currentJMVersion;
+ private final Consumer<String> errorHandler;
+
+ public HeartbeatScheduler(Consumer<String> errorHandler, TableUtils table, AtomicReference<String> currentJMVersion, final String pid) {
+ this.table = table;
+ this.currentJMVersion = currentJMVersion;
+ processorId = pid;
+ this.errorHandler = errorHandler;
+ }
+
+ @Override
+ public ScheduledFuture scheduleTask() {
+ return scheduler.scheduleWithFixedDelay(() -> {
+ try {
+ String currJVM = currentJMVersion.get();
+ LOG.info("Updating heartbeat for processor ID: " + processorId + " and job model version: " + currJVM);
+ table.updateHeartbeat(currJVM, processorId);
+ } catch (Exception e) {
+ errorHandler.accept("Exception in Heartbeat Scheduler. Stopping the processor...");
+ }
+ }, HEARTBEAT_DELAY_SEC, HEARTBEAT_DELAY_SEC, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void setStateChangeListener(SchedulerStateChangeListener listener) {}
+
+ @Override
+ public void shutdown() {
+ LOG.info("Shutting down HeartbeatScheduler");
+ scheduler.shutdownNow();
+ }
+}
\ No newline at end of file