You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/10/18 19:57:30 UTC

svn commit: r1185774 - in /incubator/kafka/trunk/clients/php/src/lib/Kafka: Encoder.php Message.php Producer.php

Author: junrao
Date: Tue Oct 18 17:57:29 2011
New Revision: 1185774

URL: http://svn.apache.org/viewvc?rev=1185774&view=rev
Log:
Php Client support for compression attribute; patched by AaronR; KAFKA-159

Modified:
    incubator/kafka/trunk/clients/php/src/lib/Kafka/Encoder.php
    incubator/kafka/trunk/clients/php/src/lib/Kafka/Message.php
    incubator/kafka/trunk/clients/php/src/lib/Kafka/Producer.php

Modified: incubator/kafka/trunk/clients/php/src/lib/Kafka/Encoder.php
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/php/src/lib/Kafka/Encoder.php?rev=1185774&r1=1185773&r2=1185774&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/php/src/lib/Kafka/Encoder.php (original)
+++ incubator/kafka/trunk/clients/php/src/lib/Kafka/Encoder.php Tue Oct 18 17:57:29 2011
@@ -27,11 +27,12 @@ class Kafka_Encoder
 	 * 
 	 * @var integer
 	 */
-	const CURRENT_MAGIC_VALUE = 0;
+	const CURRENT_MAGIC_VALUE = 1;
 	
 	/**
 	 * Encode a message. The format of an N byte message is the following:
      *  - 1 byte: "magic" identifier to allow format changes
+     *  - 1 byte:  "compression-attributes" for compression alogrithm
      *  - 4 bytes: CRC32 of the payload
      *  - (N - 5) bytes: payload
 	 * 
@@ -39,9 +40,9 @@ class Kafka_Encoder
 	 *
 	 * @return string
 	 */
-	static public function encode_message($msg) {
-		// <MAGIC_BYTE: 1 byte> <CRC32: 4 bytes bigendian> <PAYLOAD: N bytes>
-		return pack('CN', self::CURRENT_MAGIC_VALUE, crc32($msg)) 
+	static public function encode_message($msg, $compression) {
+		// <MAGIC_BYTE: 1 byte> <COMPRESSION: 1 byte> <CRC32: 4 bytes bigendian> <PAYLOAD: N bytes>
+		return pack('CCN', self::CURRENT_MAGIC_VALUE, $compression, crc32($msg)) 
 			 . $msg;
 	}
 
@@ -51,14 +52,15 @@ class Kafka_Encoder
 	 * @param string  $topic     Topic
 	 * @param integer $partition Partition number
 	 * @param array   $messages  Array of messages to send
+	 * @param compression $compression flag for type of compression 
 	 *
 	 * @return string
 	 */
-	static public function encode_produce_request($topic, $partition, array $messages) {
+	static public function encode_produce_request($topic, $partition, array $messages, $compression) {
 		// encode messages as <LEN: int><MESSAGE_BYTES>
 		$message_set = '';
 		foreach ($messages as $message) {
-			$encoded = self::encode_message($message);
+			$encoded = self::encode_message($message, $compression);
 			$message_set .= pack('N', strlen($encoded)) . $encoded;
 		}
 		// create the request as <REQUEST_SIZE: int> <REQUEST_ID: short> <TOPIC: bytes> <PARTITION: int> <BUFFER_SIZE: int> <BUFFER: bytes>

Modified: incubator/kafka/trunk/clients/php/src/lib/Kafka/Message.php
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/php/src/lib/Kafka/Message.php?rev=1185774&r1=1185773&r2=1185774&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/php/src/lib/Kafka/Message.php (original)
+++ incubator/kafka/trunk/clients/php/src/lib/Kafka/Message.php Tue Oct 18 17:57:29 2011
@@ -14,6 +14,7 @@
 /**
  * A message. The format of an N byte message is the following:
  * 1 byte "magic" identifier to allow format changes
+ * 1 byte compression-attribute
  * 4 byte CRC32 of the payload
  * N - 5 byte payload
  *
@@ -25,15 +26,6 @@
  */
 class Kafka_Message
 {
-	/*
-	private $currentMagicValue = Kafka_Encoder::CURRENT_MAGIC_VALUE;
-	private $magicOffset   = 0;
-	private $magicLength   = 1;
-	private $crcOffset     = 1; // MagicOffset + MagicLength
-	private $crcLength     = 4;
-	private $payloadOffset = 5; // CrcOffset + CrcLength
-	private $headerSize    = 5; // PayloadOffset
-	*/
 	
 	/**
 	 * @var string
@@ -46,6 +38,11 @@ class Kafka_Message
 	private $size    = 0;
 	
 	/**
+	 * @var integer
+	 */
+	private $compression    = 0;
+	
+	/**
 	 * @var string
 	 */
 	private $crc     = false;
@@ -56,10 +53,12 @@ class Kafka_Message
 	 * @param string $data Message payload
 	 */
 	public function __construct($data) {
-		$this->payload = substr($data, 5);
+		$this->payload = substr($data, 6);
+		$this->compression    = substr($data,1,1);
 		$this->crc     = crc32($this->payload);
 		$this->size    = strlen($this->payload);
 	}
+
 	
 	/**
 	 * Encode a message
@@ -121,7 +120,7 @@ class Kafka_Message
 	 * @return string
 	 */
 	public function __toString() {
-		return 'message(magic = ' . Kafka_Encoder::CURRENT_MAGIC_VALUE . ', crc = ' . $this->crc .
-			', payload = ' . $this->payload . ')';
+		return 'message(magic = ' . Kafka_Encoder::CURRENT_MAGIC_VALUE . ', compression = ' . $this->compression .
+		  ', crc = ' . $this->crc . ', payload = ' . $this->payload . ')';
 	}
 }

Modified: incubator/kafka/trunk/clients/php/src/lib/Kafka/Producer.php
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/php/src/lib/Kafka/Producer.php?rev=1185774&r1=1185773&r2=1185774&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/php/src/lib/Kafka/Producer.php (original)
+++ incubator/kafka/trunk/clients/php/src/lib/Kafka/Producer.php Tue Oct 18 17:57:29 2011
@@ -43,6 +43,11 @@ class Kafka_Producer
 	protected $port;
 
 	/**
+	 * @var integer
+	 */
+	protected $compression;
+
+	/**
 	 * Constructor
 	 * 
 	 * @param integer $host Host 
@@ -52,6 +57,7 @@ class Kafka_Producer
 		$this->request_key = 0;
 		$this->host = $host;
 		$this->port = $port;
+		$this->compression = 0;
 	}
 	
 	/**
@@ -91,7 +97,7 @@ class Kafka_Producer
 	 */
 	public function send(array $messages, $topic, $partition = 0xFFFFFFFF) {
 		$this->connect();
-		return fwrite($this->conn, Kafka_Encoder::encode_produce_request($topic, $partition, $messages));
+		return fwrite($this->conn, Kafka_Encoder::encode_produce_request($topic, $partition, $messages, $this->compression));
 	}
 
 	/**