You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/18 19:25:55 UTC

[18/51] [abbrv] git commit: [streaming] Javadocs for connectors

[streaming] Javadocs for connectors


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/2f704aeb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/2f704aeb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/2f704aeb

Branch: refs/heads/master
Commit: 2f704aeb3eb94a7b24641408699d54cd40a4d8cb
Parents: 7c5bc3c
Author: jfeher <fe...@gmail.com>
Authored: Fri Aug 1 11:22:31 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Aug 18 16:21:13 2014 +0200

----------------------------------------------------------------------
 .../streaming/connectors/flume/FlumeSink.java   | 51 +++++++++++++++---
 .../streaming/connectors/flume/FlumeSource.java | 57 ++++++++++++++++++--
 .../connectors/flume/FlumeTopology.java         |  2 +-
 .../streaming/connectors/kafka/KafkaSink.java   | 22 ++++++++
 .../streaming/connectors/kafka/KafkaSource.java | 36 +++++++++----
 .../streaming/connectors/rabbitmq/RMQSink.java  | 29 +++++++++-
 .../connectors/rabbitmq/RMQSource.java          | 27 +++++++++-
 7 files changed, 201 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2f704aeb/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
index 3379fcb..20a3a4a 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -21,21 +21,22 @@ package org.apache.flink.streaming.connectors.flume;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.DataStream;
+import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
 import org.apache.flume.api.RpcClient;
 import org.apache.flume.api.RpcClientFactory;
 import org.apache.flume.event.EventBuilder;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
 
 public abstract class FlumeSink<IN extends Tuple> extends SinkFunction<IN> {
 	private static final long serialVersionUID = 1L;
 
 	private static final Log LOG = LogFactory.getLog(RMQSource.class);
-	
+
 	private transient FlinkRpcClientFacade client;
 	boolean initDone = false;
 	String host;
@@ -48,6 +49,13 @@ public abstract class FlumeSink<IN extends Tuple> extends SinkFunction<IN> {
 		this.port = port;
 	}
 
+	/**
+	 * Receives tuples from the Apache Flink {@link DataStream} and forwards them to
+	 * Apache Flume.
+	 * 
+	 * @param tuple
+	 *            The tuple arriving from the datastream
+	 */
 	@Override
 	public void invoke(IN tuple) {
 
@@ -66,6 +74,13 @@ public abstract class FlumeSink<IN extends Tuple> extends SinkFunction<IN> {
 
 	}
 
+	/**
+	 * Serializes tuples into byte arrays.
+	 * 
+	 * @param tuple
+	 *            The tuple used for the serialization
+	 * @return The serialized byte array.
+	 */
 	public abstract byte[] serialize(IN tuple);
 
 	private class FlinkRpcClientFacade {
@@ -73,6 +88,14 @@ public abstract class FlumeSink<IN extends Tuple> extends SinkFunction<IN> {
 		private String hostname;
 		private int port;
 
+		/**
+		 * Initializes the connection to Apache Flume.
+		 * 
+		 * @param hostname
+		 *            The host
+		 * @param port
+		 *            The port.
+		 */
 		public void init(String hostname, int port) {
 			// Setup the RPC connection
 			this.hostname = hostname;
@@ -80,12 +103,13 @@ public abstract class FlumeSink<IN extends Tuple> extends SinkFunction<IN> {
 			int initCounter = 0;
 			while (true) {
 				if (initCounter >= 90) {
-					System.exit(1);
+					new RuntimeException("Cannot establish connection with" + port + " at " + host);
 				}
 				try {
 					this.client = RpcClientFactory.getDefaultInstance(hostname, port);
 				} catch (FlumeException e) {
-					// Wait one second if the connection failed before the next try
+					// Wait one second if the connection failed before the next
+					// try
 					try {
 						Thread.sleep(1000);
 					} catch (InterruptedException e1) {
@@ -102,6 +126,12 @@ public abstract class FlumeSink<IN extends Tuple> extends SinkFunction<IN> {
 			initDone = true;
 		}
 
+		/**
+		 * Sends byte arrays as {@link Event} series to Apache Flume.
+		 * 
+		 * @param data
+		 *            The byte array to send to Apache FLume
+		 */
 		public void sendDataToFlume(byte[] data) {
 			Event event = EventBuilder.withBody(data);
 
@@ -116,16 +146,25 @@ public abstract class FlumeSink<IN extends Tuple> extends SinkFunction<IN> {
 			}
 		}
 
+		/**
+		 * Closes the RpcClient.
+		 */
 		public void close() {
 			client.close();
 		}
 
 	}
 
+	/**
+	 * Closes the connection only when the next message is sent after this call.
+	 */
 	public void sendAndClose() {
 		sendAndClose = true;
 	}
 
+	/**
+	 * Closes the connection immediately and no further data will be sent.
+	 */
 	public void closeWithoutSend() {
 		client.close();
 		closeWithoutSend = true;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2f704aeb/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
index 5b90e14..8b102a8 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
@@ -26,8 +26,8 @@ import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.source.AvroSource;
 import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.flume.source.avro.Status;
-
 import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.DataStream;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.util.Collector;
 
@@ -45,12 +45,28 @@ public abstract class FlumeSource<IN extends Tuple> extends SourceFunction<IN> {
 	public class MyAvroSource extends AvroSource {
 		Collector<IN> collector;
 
+		/**
+		 * Sends the AvroFlumeEvent from it's argument list to the Apache Flink
+		 * {@link DataStream}.
+		 * 
+		 * @param avroEvent
+		 *            The event that should be sent to the dataStream
+		 * @return A {@link Status}.OK message if sending the event was successful.
+		 */
 		@Override
 		public Status append(AvroFlumeEvent avroEvent) {
 			collect(avroEvent);
 			return Status.OK;
 		}
 
+		/**
+		 * Sends the AvroFlumeEvents from it's argument list to the Apache Flink
+		 * {@link DataStream}.
+		 * 
+		 * @param events
+		 *            The events that is sent to the dataStream
+		 * @return A Status.OK message if sending the events was successful.
+		 */
 		@Override
 		public Status appendBatch(List<AvroFlumeEvent> events) {
 			for (AvroFlumeEvent avroEvent : events) {
@@ -60,6 +76,13 @@ public abstract class FlumeSource<IN extends Tuple> extends SourceFunction<IN> {
 			return Status.OK;
 		}
 
+		/**
+		 * Deserializes the AvroFlumeEvent before sending it to the Apache Flink
+		 * {@link DataStream}.
+		 * 
+		 * @param avroEvent
+		 *            The event that is sent to the dataStream
+		 */
 		private void collect(AvroFlumeEvent avroEvent) {
 			byte[] b = avroEvent.getBody().array();
 			IN tuple = FlumeSource.this.deserialize(b);
@@ -78,8 +101,22 @@ public abstract class FlumeSource<IN extends Tuple> extends SourceFunction<IN> {
 	private boolean sendAndClose = false;
 	private volatile boolean sendDone = false;
 
-	public abstract IN deserialize(byte[] msg);
-
+	/**
+	 * Deserializes the incoming data.
+	 * 
+	 * @param message
+	 *            The incoming message in a byte array
+	 * @return The deserialized message in the required format.
+	 */
+	public abstract IN deserialize(byte[] message);
+
+	/**
+	 * Configures the AvroSource. Also sets the collector so the application can
+	 * use it from outside of the invoke function.
+	 * 
+	 * @param collector
+	 *            The collector used in the invoke function
+	 */
 	public void configureAvroSource(Collector<IN> collector) {
 
 		avroSource = new MyAvroSource();
@@ -88,10 +125,18 @@ public abstract class FlumeSource<IN extends Tuple> extends SourceFunction<IN> {
 		context.put("port", port);
 		context.put("bind", host);
 		avroSource.configure(context);
+		// An instance of a ChannelProcessor is required for configuring the
+		// avroSource although it will not be used in this case.
 		ChannelProcessor cp = new ChannelProcessor(null);
 		avroSource.setChannelProcessor(cp);
 	}
 
+	/**
+	 * Configures the AvroSource and runs until the user calls a close function.
+	 * 
+	 * @param collector
+	 *            The Collector for sending data to the datastream
+	 */
 	@Override
 	public void invoke(Collector<IN> collector) throws Exception {
 		configureAvroSource(collector);
@@ -104,10 +149,16 @@ public abstract class FlumeSource<IN extends Tuple> extends SourceFunction<IN> {
 		avroSource.stop();
 	}
 
+	/**
+	 * Closes the connection only when the next message is sent after this call.
+	 */
 	public void sendAndClose() {
 		sendAndClose = true;
 	}
 
+	/**
+	 * Closes the connection immediately and no further data will be sent.
+	 */
 	public void closeWithoutSend() {
 		closeWithoutSend = true;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2f704aeb/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
index 779a5fb..3c45cd4 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
@@ -39,7 +39,7 @@ public class FlumeTopology {
 				try {
 					sendAndClose();
 				} catch (Exception e) {
-					new RuntimeException("Error while closing RMQ connection with " + port + " at "
+					new RuntimeException("Error while closing Flume connection with " + port + " at "
 							+ host, e);
 				}
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2f704aeb/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
index 44867ef..7e3f3db 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSink.java
@@ -45,6 +45,9 @@ public abstract class KafkaSink<IN extends Tuple, OUT> extends SinkFunction<IN>
 
 	}
 
+	/**
+	 * Initializes the connection to Kafka.
+	 */
 	public void initialize() {
 		props = new Properties();
 
@@ -57,6 +60,12 @@ public abstract class KafkaSink<IN extends Tuple, OUT> extends SinkFunction<IN>
 		initDone = true;
 	}
 
+	/**
+	 * Called when new data arrives to the sink, and forwards it to Kafka.
+	 * 
+	 * @param tuple
+	 *            The incoming data
+	 */
 	@Override
 	public void invoke(IN tuple) {
 		if (!initDone) {
@@ -75,13 +84,26 @@ public abstract class KafkaSink<IN extends Tuple, OUT> extends SinkFunction<IN>
 		}
 	}
 
+	/**
+	 * Serializes tuples into byte arrays.
+	 * 
+	 * @param tuple
+	 *            The tuple used for the serialization
+	 * @return The serialized byte array.
+	 */
 	public abstract OUT serialize(IN tuple);
 
+	/**
+	 * Closes the connection immediately and no further data will be sent.
+	 */
 	public void closeWithoutSend() {
 		producer.close();
 		closeWithoutSend = true;
 	}
 
+	/**
+	 * Closes the connection only when the next message is sent after this call.
+	 */
 	public void sendAndClose() {
 		sendAndClose = true;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2f704aeb/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
index 32b11d9..623e3b8 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSource.java
@@ -28,14 +28,12 @@ import kafka.consumer.ConsumerConfig;
 import kafka.consumer.ConsumerIterator;
 import kafka.consumer.KafkaStream;
 import kafka.javaapi.consumer.ConsumerConnector;
+
 import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.DataStream;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.util.Collector;
 
-/**
- * Source for reading messages from a Kafka queue. The source currently only
- * support string messages.
- */
 public abstract class KafkaSource<IN extends Tuple> extends SourceFunction<IN> {
 	private static final long serialVersionUID = 1L;
 
@@ -49,14 +47,16 @@ public abstract class KafkaSource<IN extends Tuple> extends SourceFunction<IN> {
 
 	IN outTuple;
 
-	public KafkaSource(String zkQuorum, String groupId, String topicId,
-			int numThreads) {
+	public KafkaSource(String zkQuorum, String groupId, String topicId, int numThreads) {
 		this.zkQuorum = zkQuorum;
 		this.groupId = groupId;
 		this.topicId = topicId;
 		this.numThreads = numThreads;
 	}
 
+	/**
+	 * Initializes the connection to Kafka.
+	 */
 	private void initializeConnection() {
 		Properties props = new Properties();
 		props.put("zookeeper.connect", zkQuorum);
@@ -64,10 +64,15 @@ public abstract class KafkaSource<IN extends Tuple> extends SourceFunction<IN> {
 		props.put("zookeeper.session.timeout.ms", "400");
 		props.put("zookeeper.sync.time.ms", "200");
 		props.put("auto.commit.interval.ms", "1000");
-		consumer = kafka.consumer.Consumer
-				.createJavaConsumerConnector(new ConsumerConfig(props));
+		consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
 	}
 
+	/**
+	 * Called to forward the data from the source to the {@link DataStream}.
+	 * 
+	 * @param collector
+	 *            The Collector for sending data to the dataStream
+	 */
 	@Override
 	public void invoke(Collector<IN> collector) throws Exception {
 		initializeConnection();
@@ -92,12 +97,25 @@ public abstract class KafkaSource<IN extends Tuple> extends SourceFunction<IN> {
 		consumer.shutdown();
 	}
 
-	public abstract IN deserialize(byte[] msg);
+	/**
+	 * Deserializes the incoming data.
+	 * 
+	 * @param message
+	 *            The incoming message in a byte array
+	 * @return The deserialized message in the required format.
+	 */
+	public abstract IN deserialize(byte[] message);
 
+	/**
+	 * Closes the connection immediately and no further data will be sent.
+	 */
 	public void closeWithoutSend() {
 		closeWithoutSend = true;
 	}
 
+	/**
+	 * Closes the connection only when the next message is sent after this call.
+	 */
 	public void sendAndClose() {
 		sendAndClose = true;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2f704aeb/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index 663fc13..d7ed17a 100644
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -50,6 +50,9 @@ public abstract class RMQSink<IN extends Tuple> extends SinkFunction<IN> {
 		this.QUEUE_NAME = QUEUE_NAME;
 	}
 
+	/**
+	 * Initializes the connection to RMQ.
+	 */
 	public void initializeConnection() {
 		factory = new ConnectionFactory();
 		factory.setHost(HOST_NAME);
@@ -64,6 +67,12 @@ public abstract class RMQSink<IN extends Tuple> extends SinkFunction<IN> {
 		initDone = true;
 	}
 
+	/**
+	 * Called when new data arrives to the sink, and forwards it to RMQ.
+	 * 
+	 * @param tuple
+	 *            The incoming data
+	 */
 	@Override
 	public void invoke(IN tuple) {
 		if (!initDone) {
@@ -87,8 +96,18 @@ public abstract class RMQSink<IN extends Tuple> extends SinkFunction<IN> {
 		}
 	}
 
-	public abstract byte[] serialize(Tuple t);
-
+	/**
+	 * Serializes tuples into byte arrays.
+	 * 
+	 * @param tuple
+	 *            The tuple used for the serialization
+	 * @return The serialized byte array.
+	 */
+	public abstract byte[] serialize(Tuple tuple);
+
+	/**
+	 * Closes the connection.
+	 */
 	private void closeChannel() {
 		try {
 			channel.close();
@@ -100,11 +119,17 @@ public abstract class RMQSink<IN extends Tuple> extends SinkFunction<IN> {
 
 	}
 
+	/**
+	 * Closes the connection immediately and no further data will be sent.
+	 */
 	public void closeWithoutSend() {
 		closeChannel();
 		closeWithoutSend = true;
 	}
 
+	/**
+	 * Closes the connection only when the next message is sent after this call.
+	 */
 	public void sendAndClose() {
 		sendAndClose = true;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2f704aeb/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index 4fd2235..dfea55a 100755
--- a/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -30,6 +30,7 @@ import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.QueueingConsumer;
 
 import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.DataStream;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.util.Collector;
 
@@ -56,6 +57,9 @@ public abstract class RMQSource<IN extends Tuple> extends SourceFunction<IN> {
 		this.QUEUE_NAME = QUEUE_NAME;
 	}
 
+	/**
+	 * Initializes the connection to RMQ.
+	 */
 	private void initializeConnection() {
 		factory = new ConnectionFactory();
 		factory.setHost(HOST_NAME);
@@ -71,6 +75,12 @@ public abstract class RMQSource<IN extends Tuple> extends SourceFunction<IN> {
 		}
 	}
 
+	/**
+	 * Called to forward the data from the source to the {@link DataStream}.
+	 * 
+	 * @param collector
+	 *            The Collector for sending data to the dataStream
+	 */
 	@Override
 	public void invoke(Collector<IN> collector) throws Exception {
 		initializeConnection();
@@ -105,12 +115,25 @@ public abstract class RMQSource<IN extends Tuple> extends SourceFunction<IN> {
 
 	}
 
-	public abstract IN deserialize(byte[] t);
-
+	/**
+	 * Deserializes the incoming data.
+	 * 
+	 * @param message
+	 *            The incoming message in a byte array
+	 * @return The deserialized message in the required format.
+	 */
+	public abstract IN deserialize(byte[] message);
+
+	/**
+	 * Closes the connection immediately and no further data will be sent.
+	 */
 	public void closeWithoutSend() {
 		closeWithoutSend = true;
 	}
 
+	/**
+	 * Closes the connection only when the next message is sent after this call.
+	 */
 	public void sendAndClose() {
 		sendAndClose = true;
 	}