You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@ignite.apache.org by james wu <ja...@coupang.com> on 2017/10/12 03:02:38 UTC

How to cancel IgniteRunnale on remote node?

Hi:

  I design ignite cluster as following:
1. Among cluster one role is job submitter (client mode), another role is
worker (server mode)
2. job submitter submit IgniteRunnable via Executor Service to worker node
3. IgniteRunnable answer for receive and process Kafka message
4. In client mode submit code, add shutdown hook to close kafka consumer in
IgniteRunnable and call ExecutionService.shutdown() for graceful cancel
Remote IgniteRunnable when client node exit.
5. All server node start from ignite release binary package via ignite.sh.
all customer code package as jar add to ignite libs dir. client node start
from java main.
6. When I terminate client process, the log show shutdown hook called, the
kafka consumer close called, executionservice.shutdown called, but the
remote IgniteRunnable still running and process kafka message

Code like this:

1. Client submit code:
public class IgniteKafkaOrderPaymentCompleteStreamingJob extends
IgniteBaseJob {

	private static final int INITIAL_COUNT = 5;

	private static final String PAYMENT_COMPLETED_DATA_PROCESSOR =
"paymentCompletedDataProcesser";

	public static void main(String[] args) {
		String springConfigProperty = getSpringPropertiesSuffix();
		ExecutorService executionService =
createExecutionService(true,IgniteJobConstants.IGNITE_CLUSTER_COMPUTE_ROLE);
		List<IgniteKafkaPaymentCompleteConsumer> consumers = new ArrayList();
		for (int i = 0; i < INITIAL_COUNT; i++) {
			IgniteKafkaPaymentCompleteConsumer paymentCompleteStreamingConsumer = new
IgniteKafkaPaymentCompleteConsumer();
		
paymentCompleteStreamingConsumer.setProcesserBeanName(PAYMENT_COMPLETED_DATA_PROCESSOR);
		
paymentCompleteStreamingConsumer.setDataProcesserClass(IOrderDataProcesser.class);
		
paymentCompleteStreamingConsumer.setConfigProperties(springConfigProperty);
			consumers.add(paymentCompleteStreamingConsumer);
			executionService.submit(paymentCompleteStreamingConsumer);
		}
		AddShutDownHock(executionService, consumers);

	}

}

public abstract class IgniteBaseJob {

	public static IgniteLogger log;
	private static String SPRING_PROFILE_KEY = "spring.profile.active";

	/**
	 * This method used to create the execution service
	 * @param clientMode
	 * @param roleInstance
	 * @return
	 */
	protected static ExecutorService createExecutionService(Boolean clientMode,
String roleInstance) {
		Ignition.setClientMode(clientMode);
		Ignite ignite = initializeIgniteContext("ignite-default.xml");
		IgniteCluster cluster = ignite.cluster();
		ClusterGroup worker =
cluster.forAttribute(IgniteJobConstants.IGNITE_CLUSTER_GROUP_KEY,
roleInstance);
		return ignite.executorService(worker);
	}

	/**
	 * This method used to getting the JVM parameter which used to indicate
which
	 *
	 * @return
	 */
	protected static String getSpringPropertiesSuffix() {
		String springActiveProfile = System.getProperty(SPRING_PROFILE_KEY);
		String springPropertiesSuffix = StringUtils.isBlank(springActiveProfile) ?
SpringPropertiesType.production.name() : springActiveProfile;
		return springPropertiesSuffix;
	}

	protected static Ignite initializeIgniteContext() {
		try {
			Ignite ignite = Ignition.start(
				
IgniteBaseJob.class.getClassLoader().getResourceAsStream("fds-ignite-develop.xml"));
			log = Ignition.ignite().log();
			return ignite;
		} catch (Exception e) {
			e.printStackTrace();

			log.error("Ignite context initialize error with the error details" + e);
		}
		return null;
	}

	/**
	 * This method used to initialize the Ignite Context with input ignite
property file
	 * @param fileName
	 * @return
	 */
	protected static Ignite initializeIgniteContext(String fileName) {
		try {
			Ignite ignite = Ignition.start(
					IgniteBaseJob.class.getClassLoader().getResourceAsStream(
							fileName));
			if (log == null) {
				log = Ignition.ignite().log();
			}
			return ignite;
		} catch (Exception e) {
			e.printStackTrace();
			log.error("Ignite context initialize error with the error details" + e);
		}
		return null;
	}

	/**
	 * This method used to add shutdownhock
	 * @param executionService
	 * @param consumers
	 */
	protected static void AddShutDownHock(ExecutorService executionService,
List<? extends AbstractIgniteKafkaConsumer> consumers) {
		Runtime.getRuntime().addShutdownHook(new Thread() {
			@Override
			public void run() {
				log.info("***********************Shutdownhock got executed");
				for (AbstractIgniteKafkaConsumer consumer : consumers) {
					consumer.stop();
				}
				executionService.shutdown();
				try {
					executionService.awaitTermination(5000, TimeUnit.MILLISECONDS);
				} catch (InterruptedException e) {
					log.error("Error happens during shutdown the distributed kafka
streamer");
				}
			}
		});
	}

}

2. IgniteRunnale for consume kafka message

@Getter
@Setter
public class IgniteKafkaPaymentCompleteConsumer extends
AbstractIgniteKafkaConsumer<OrderMessage>{
    public static final String PAYMENT_COMPLETE_STREAMING_EXECUTION_CONTEXT
= "paymentCompleteStreamingExecutionContext";

    public IgniteKafkaPaymentCompleteConsumer() {
        this.kafkaStreamer = new IgniteKafkaStreamer<>();
        this.log = Ignition.ignite().log();
    }
    public void run() {
        try {
            log.info("Start to initialize the  Payment Complete Ignite
stream!");
            initKafkaConsumer(PAYMENT_COMPLETE_STREAMING_EXECUTION_CONTEXT);

            kafkaStreamer.setMultipleTupleExtractor(

                    new
StreamMultipleTupleExtractor<MessageAndMetadata&lt;byte[], byte[]>, String,
String>() {
                        @Override
                        public Map<String, String>
extract(MessageAndMetadata<byte[], byte[]> msg) {
                            try {
                                String orderMsg = new String(msg.message(),
"UTF-8");
                                if(msg!=null &&
StringUtils.isNotBlank(orderMsg)){
                                    log.debug(("===================>order
message:"+kafkaStreamer.getTopic() + ":::::::" + orderMsg));
                                   
//System.out.println(("===================>completed order
message:"+kafkaStreamer.getTopic() + ":::::::" + orderMsg));
                                    OrderMessage orderMessage =
JsonUtils.json2Object(orderMsg, OrderMessage.class);
                                   
orderMessage.getAdditionalProperties().put("OrderMessageType",
OrderMessageType.PAYMENT_COMPLETE);
                                   
getiDataProcesser().process(orderMessage, application);
                                }
                            } catch (Exception ex) {
                                log.error("Error happens when try to extract
the data from the order message kafka topic" +
ExceptionUtils.getFullStackTrace(ex));
                            }

                            return null;
                        }
                    });

            kafkaStreamer.start();
            log.info("Ignite kafka stream started!");
        } catch (Exception e) {
            log.info("error happens during starting the ignite kafka
stream:" + e);
        }
    }



}

@Getter
@Setter
public abstract class AbstractIgniteKafkaConsumer<T> implements
IgniteRunnable{

	protected IgniteLogger log;
	protected IgniteKafkaStreamer<String, String> kafkaStreamer;
	protected ApplicationContext application;
	protected String configProperties;
	protected StreamingExecutionContext streamingExecutionContext;
	private IDataProcesser<T> iDataProcesser;
	private String avroSchemaFile;
	private Class<? extends IDataProcesser> dataProcesserClass;
	private String processerBeanName;
	private String topic;

	public IDataProcesser<T> getiDataProcesser() {
		return iDataProcesser;
	}

	public void setiDataProcesser() {
		this.iDataProcesser =
SpringContextLoader.getBeanByNameWithType(application, processerBeanName,
dataProcesserClass);
	}

	/**
	 * This method used to compose the default consumer config
	 * @param streamingExecutionContext
	 * @return
	 */
	protected ConsumerConfig
createDefaultConsumerConfig(StreamingExecutionContext
streamingExecutionContext) {
		Properties props = new Properties();
		props.put("zookeeper.connect",
streamingExecutionContext.getZookeeperUrl());
		props.put("group.id", streamingExecutionContext.getGroupId());
		props.put("zookeeper.session.timeout.ms",
streamingExecutionContext.getZookerperSessionTimeOut());
		props.put("zookeeper.sync.time.ms", "200");
		props.put("auto.commit.interval.ms",
streamingExecutionContext.getAutoCommitInterval());
		return new ConsumerConfig(props);
	}

	/**
	 * This method used to initialize the kafka streamer
	 * @param executionContextBeanName
	 */
	protected void initKafkaConsumer(String executionContextBeanName) {
		application =
SpringContextLoader.getInstance().initialize(getConfigProperties());
		streamingExecutionContext =
SpringContextLoader.getInstance().getBeanByName(executionContextBeanName,
				StreamingExecutionContext.class);
		setiDataProcesser();
		Ignite ignite = Ignition.ignite();
		log = ignite.log();
		kafkaStreamer.setIgnite(ignite);
		kafkaStreamer.setTopic(streamingExecutionContext.getTopic());
		kafkaStreamer.setThreads(4);
	
kafkaStreamer.setConsumerConfig(createDefaultConsumerConfig(streamingExecutionContext));

	}

	/**
	 * This method used to stop the kafka consumer when shutdown the ignite
job.
	 */
	public void stop() {
		if (kafkaStreamer != null)
			kafkaStreamer.stop();
		log.info("Ignite kafka stream stopped!");
	}

}

Could you help answer my question?

Thanks

James





--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/

Re: How to cancel IgniteRunnale on remote node?

Posted by Andrey Mashenkov <an...@gmail.com>.
Hi,

1. Ignite.executorService() method return JDK ExecutorService interface
implementation.
2. ExecutorService.shutdown() method javadoc says:
* Initiates an orderly shutdown in which previously submitted
* tasks are executed, but no new tasks will be accepted.
* Invocation has no additional effect if already shut down.

So, there said nothing that running tasks will be terminated.

3. Looks like you have to use ExecutorService.shutdownNow(), that will
reject all tasks in queue and interrupt tasks are running.
Javadoc:
* Attempts to stop all actively executing tasks, halts the
* processing of waiting tasks, and returns a list of the tasks
* that were awaiting execution.

And also your tasks should support cancellation via thread interrupted flag.
There are no guarantees beyond best-effort attempts to stop
* processing actively executing tasks. For example, typical
* implementations will cancel via {@link Thread#interrupt}, so any
* task that fails to respond to interrupts may never terminate.



On Thu, Oct 12, 2017 at 6:02 AM, james wu <ja...@coupang.com> wrote:

> Hi:
>
>   I design ignite cluster as following:
> 1. Among cluster one role is job submitter (client mode), another role is
> worker (server mode)
> 2. job submitter submit IgniteRunnable via Executor Service to worker node
> 3. IgniteRunnable answer for receive and process Kafka message
> 4. In client mode submit code, add shutdown hook to close kafka consumer in
> IgniteRunnable and call ExecutionService.shutdown() for graceful cancel
> Remote IgniteRunnable when client node exit.
> 5. All server node start from ignite release binary package via ignite.sh.
> all customer code package as jar add to ignite libs dir. client node start
> from java main.
> 6. When I terminate client process, the log show shutdown hook called, the
> kafka consumer close called, executionservice.shutdown called, but the
> remote IgniteRunnable still running and process kafka message
>
> Code like this:
>
> 1. Client submit code:
> public class IgniteKafkaOrderPaymentCompleteStreamingJob extends
> IgniteBaseJob {
>
>         private static final int INITIAL_COUNT = 5;
>
>         private static final String PAYMENT_COMPLETED_DATA_PROCESSOR =
> "paymentCompletedDataProcesser";
>
>         public static void main(String[] args) {
>                 String springConfigProperty = getSpringPropertiesSuffix();
>                 ExecutorService executionService =
> createExecutionService(true,IgniteJobConstants.IGNITE_
> CLUSTER_COMPUTE_ROLE);
>                 List<IgniteKafkaPaymentCompleteConsumer> consumers = new
> ArrayList();
>                 for (int i = 0; i < INITIAL_COUNT; i++) {
>                         IgniteKafkaPaymentCompleteConsumer
> paymentCompleteStreamingConsumer = new
> IgniteKafkaPaymentCompleteConsumer();
>
> paymentCompleteStreamingConsumer.setProcesserBeanName(
> PAYMENT_COMPLETED_DATA_PROCESSOR);
>
> paymentCompleteStreamingConsumer.setDataProcesserClass(
> IOrderDataProcesser.class);
>
> paymentCompleteStreamingConsumer.setConfigProperties(
> springConfigProperty);
>                         consumers.add(paymentCompleteStreamingConsumer);
>                         executionService.submit(
> paymentCompleteStreamingConsumer);
>                 }
>                 AddShutDownHock(executionService, consumers);
>
>         }
>
> }
>
> public abstract class IgniteBaseJob {
>
>         public static IgniteLogger log;
>         private static String SPRING_PROFILE_KEY = "spring.profile.active";
>
>         /**
>          * This method used to create the execution service
>          * @param clientMode
>          * @param roleInstance
>          * @return
>          */
>         protected static ExecutorService createExecutionService(Boolean
> clientMode,
> String roleInstance) {
>                 Ignition.setClientMode(clientMode);
>                 Ignite ignite = initializeIgniteContext("
> ignite-default.xml");
>                 IgniteCluster cluster = ignite.cluster();
>                 ClusterGroup worker =
> cluster.forAttribute(IgniteJobConstants.IGNITE_CLUSTER_GROUP_KEY,
> roleInstance);
>                 return ignite.executorService(worker);
>         }
>
>         /**
>          * This method used to getting the JVM parameter which used to
> indicate
> which
>          *
>          * @return
>          */
>         protected static String getSpringPropertiesSuffix() {
>                 String springActiveProfile = System.getProperty(SPRING_
> PROFILE_KEY);
>                 String springPropertiesSuffix = StringUtils.isBlank(springActiveProfile)
> ?
> SpringPropertiesType.production.name() : springActiveProfile;
>                 return springPropertiesSuffix;
>         }
>
>         protected static Ignite initializeIgniteContext() {
>                 try {
>                         Ignite ignite = Ignition.start(
>
> IgniteBaseJob.class.getClassLoader().getResourceAsStream("fds-
> ignite-develop.xml"));
>                         log = Ignition.ignite().log();
>                         return ignite;
>                 } catch (Exception e) {
>                         e.printStackTrace();
>
>                         log.error("Ignite context initialize error with
> the error details" + e);
>                 }
>                 return null;
>         }
>
>         /**
>          * This method used to initialize the Ignite Context with input
> ignite
> property file
>          * @param fileName
>          * @return
>          */
>         protected static Ignite initializeIgniteContext(String fileName) {
>                 try {
>                         Ignite ignite = Ignition.start(
>                                         IgniteBaseJob.class.
> getClassLoader().getResourceAsStream(
>                                                         fileName));
>                         if (log == null) {
>                                 log = Ignition.ignite().log();
>                         }
>                         return ignite;
>                 } catch (Exception e) {
>                         e.printStackTrace();
>                         log.error("Ignite context initialize error with
> the error details" + e);
>                 }
>                 return null;
>         }
>
>         /**
>          * This method used to add shutdownhock
>          * @param executionService
>          * @param consumers
>          */
>         protected static void AddShutDownHock(ExecutorService
> executionService,
> List<? extends AbstractIgniteKafkaConsumer> consumers) {
>                 Runtime.getRuntime().addShutdownHook(new Thread() {
>                         @Override
>                         public void run() {
>                                 log.info("***********************Shutdownhock
> got executed");
>                                 for (AbstractIgniteKafkaConsumer consumer
> : consumers) {
>                                         consumer.stop();
>                                 }
>                                 executionService.shutdown();
>                                 try {
>                                         executionService.awaitTermination(5000,
> TimeUnit.MILLISECONDS);
>                                 } catch (InterruptedException e) {
>                                         log.error("Error happens during
> shutdown the distributed kafka
> streamer");
>                                 }
>                         }
>                 });
>         }
>
> }
>
> 2. IgniteRunnale for consume kafka message
>
> @Getter
> @Setter
> public class IgniteKafkaPaymentCompleteConsumer extends
> AbstractIgniteKafkaConsumer<OrderMessage>{
>     public static final String PAYMENT_COMPLETE_STREAMING_
> EXECUTION_CONTEXT
> = "paymentCompleteStreamingExecutionContext";
>
>     public IgniteKafkaPaymentCompleteConsumer() {
>         this.kafkaStreamer = new IgniteKafkaStreamer<>();
>         this.log = Ignition.ignite().log();
>     }
>     public void run() {
>         try {
>             log.info("Start to initialize the  Payment Complete Ignite
> stream!");
>             initKafkaConsumer(PAYMENT_COMPLETE_STREAMING_EXECUTION_
> CONTEXT);
>
>             kafkaStreamer.setMultipleTupleExtractor(
>
>                     new
> StreamMultipleTupleExtractor<MessageAndMetadata&lt;byte[], byte[]>,
> String,
> String>() {
>                         @Override
>                         public Map<String, String>
> extract(MessageAndMetadata<byte[], byte[]> msg) {
>                             try {
>                                 String orderMsg = new String(msg.message(),
> "UTF-8");
>                                 if(msg!=null &&
> StringUtils.isNotBlank(orderMsg)){
>                                     log.debug(("===================>order
> message:"+kafkaStreamer.getTopic() + ":::::::" + orderMsg));
>
> //System.out.println(("===================>completed order
> message:"+kafkaStreamer.getTopic() + ":::::::" + orderMsg));
>                                     OrderMessage orderMessage =
> JsonUtils.json2Object(orderMsg, OrderMessage.class);
>
> orderMessage.getAdditionalProperties().put("OrderMessageType",
> OrderMessageType.PAYMENT_COMPLETE);
>
> getiDataProcesser().process(orderMessage, application);
>                                 }
>                             } catch (Exception ex) {
>                                 log.error("Error happens when try to
> extract
> the data from the order message kafka topic" +
> ExceptionUtils.getFullStackTrace(ex));
>                             }
>
>                             return null;
>                         }
>                     });
>
>             kafkaStreamer.start();
>             log.info("Ignite kafka stream started!");
>         } catch (Exception e) {
>             log.info("error happens during starting the ignite kafka
> stream:" + e);
>         }
>     }
>
>
>
> }
>
> @Getter
> @Setter
> public abstract class AbstractIgniteKafkaConsumer<T> implements
> IgniteRunnable{
>
>         protected IgniteLogger log;
>         protected IgniteKafkaStreamer<String, String> kafkaStreamer;
>         protected ApplicationContext application;
>         protected String configProperties;
>         protected StreamingExecutionContext streamingExecutionContext;
>         private IDataProcesser<T> iDataProcesser;
>         private String avroSchemaFile;
>         private Class<? extends IDataProcesser> dataProcesserClass;
>         private String processerBeanName;
>         private String topic;
>
>         public IDataProcesser<T> getiDataProcesser() {
>                 return iDataProcesser;
>         }
>
>         public void setiDataProcesser() {
>                 this.iDataProcesser =
> SpringContextLoader.getBeanByNameWithType(application, processerBeanName,
> dataProcesserClass);
>         }
>
>         /**
>          * This method used to compose the default consumer config
>          * @param streamingExecutionContext
>          * @return
>          */
>         protected ConsumerConfig
> createDefaultConsumerConfig(StreamingExecutionContext
> streamingExecutionContext) {
>                 Properties props = new Properties();
>                 props.put("zookeeper.connect",
> streamingExecutionContext.getZookeeperUrl());
>                 props.put("group.id", streamingExecutionContext.
> getGroupId());
>                 props.put("zookeeper.session.timeout.ms",
> streamingExecutionContext.getZookerperSessionTimeOut());
>                 props.put("zookeeper.sync.time.ms", "200");
>                 props.put("auto.commit.interval.ms",
> streamingExecutionContext.getAutoCommitInterval());
>                 return new ConsumerConfig(props);
>         }
>
>         /**
>          * This method used to initialize the kafka streamer
>          * @param executionContextBeanName
>          */
>         protected void initKafkaConsumer(String executionContextBeanName) {
>                 application =
> SpringContextLoader.getInstance().initialize(getConfigProperties());
>                 streamingExecutionContext =
> SpringContextLoader.getInstance().getBeanByName(executionContextBeanName,
>                                 StreamingExecutionContext.class);
>                 setiDataProcesser();
>                 Ignite ignite = Ignition.ignite();
>                 log = ignite.log();
>                 kafkaStreamer.setIgnite(ignite);
>                 kafkaStreamer.setTopic(streamingExecutionContext.
> getTopic());
>                 kafkaStreamer.setThreads(4);
>
> kafkaStreamer.setConsumerConfig(createDefaultConsumerConfig(
> streamingExecutionContext));
>
>         }
>
>         /**
>          * This method used to stop the kafka consumer when shutdown the
> ignite
> job.
>          */
>         public void stop() {
>                 if (kafkaStreamer != null)
>                         kafkaStreamer.stop();
>                 log.info("Ignite kafka stream stopped!");
>         }
>
> }
>
> Could you help answer my question?
>
> Thanks
>
> James
>
>
>
>
>
> --
> Sent from: http://apache-ignite-users.70518.x6.nabble.com/
>



-- 
Best regards,
Andrey V. Mashenkov

Re: How to cancel IgniteRunnale on remote node?

Posted by james wu <ja...@coupang.com>.
Thanks a lot

Whether the Remote IgniteRunnable can be graceful shutdown if I implement
ComputeJobMasterLeaveAware interface in my Kafka consumer IgniteRunnable,
and close kafka streamer in onMasterNodeLeft().

And if the remote server want to receive master node left event. Is there
any configuration need to add to IgniteConfiguration?

Thanks

James



--
Sent from: http://apache-ignite-users.70518.x6.nabble.com/