You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2011/10/18 19:57:30 UTC
svn commit: r1185774 - in /incubator/kafka/trunk/clients/php/src/lib/Kafka:
Encoder.php Message.php Producer.php
Author: junrao
Date: Tue Oct 18 17:57:29 2011
New Revision: 1185774
URL: http://svn.apache.org/viewvc?rev=1185774&view=rev
Log:
Php Client support for compression attribute; patched by AaronR; KAFKA-159
Modified:
incubator/kafka/trunk/clients/php/src/lib/Kafka/Encoder.php
incubator/kafka/trunk/clients/php/src/lib/Kafka/Message.php
incubator/kafka/trunk/clients/php/src/lib/Kafka/Producer.php
Modified: incubator/kafka/trunk/clients/php/src/lib/Kafka/Encoder.php
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/php/src/lib/Kafka/Encoder.php?rev=1185774&r1=1185773&r2=1185774&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/php/src/lib/Kafka/Encoder.php (original)
+++ incubator/kafka/trunk/clients/php/src/lib/Kafka/Encoder.php Tue Oct 18 17:57:29 2011
@@ -27,11 +27,12 @@ class Kafka_Encoder
*
* @var integer
*/
- const CURRENT_MAGIC_VALUE = 0;
+ const CURRENT_MAGIC_VALUE = 1;
/**
* Encode a message. The format of an N byte message is the following:
* - 1 byte: "magic" identifier to allow format changes
+ * - 1 byte: "compression-attributes" for compression alogrithm
* - 4 bytes: CRC32 of the payload
* - (N - 5) bytes: payload
*
@@ -39,9 +40,9 @@ class Kafka_Encoder
*
* @return string
*/
- static public function encode_message($msg) {
- // <MAGIC_BYTE: 1 byte> <CRC32: 4 bytes bigendian> <PAYLOAD: N bytes>
- return pack('CN', self::CURRENT_MAGIC_VALUE, crc32($msg))
+ static public function encode_message($msg, $compression) {
+ // <MAGIC_BYTE: 1 byte> <COMPRESSION: 1 byte> <CRC32: 4 bytes bigendian> <PAYLOAD: N bytes>
+ return pack('CCN', self::CURRENT_MAGIC_VALUE, $compression, crc32($msg))
. $msg;
}
@@ -51,14 +52,15 @@ class Kafka_Encoder
* @param string $topic Topic
* @param integer $partition Partition number
* @param array $messages Array of messages to send
+ * @param compression $compression flag for type of compression
*
* @return string
*/
- static public function encode_produce_request($topic, $partition, array $messages) {
+ static public function encode_produce_request($topic, $partition, array $messages, $compression) {
// encode messages as <LEN: int><MESSAGE_BYTES>
$message_set = '';
foreach ($messages as $message) {
- $encoded = self::encode_message($message);
+ $encoded = self::encode_message($message, $compression);
$message_set .= pack('N', strlen($encoded)) . $encoded;
}
// create the request as <REQUEST_SIZE: int> <REQUEST_ID: short> <TOPIC: bytes> <PARTITION: int> <BUFFER_SIZE: int> <BUFFER: bytes>
Modified: incubator/kafka/trunk/clients/php/src/lib/Kafka/Message.php
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/php/src/lib/Kafka/Message.php?rev=1185774&r1=1185773&r2=1185774&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/php/src/lib/Kafka/Message.php (original)
+++ incubator/kafka/trunk/clients/php/src/lib/Kafka/Message.php Tue Oct 18 17:57:29 2011
@@ -14,6 +14,7 @@
/**
* A message. The format of an N byte message is the following:
* 1 byte "magic" identifier to allow format changes
+ * 1 byte compression-attribute
* 4 byte CRC32 of the payload
* N - 5 byte payload
*
@@ -25,15 +26,6 @@
*/
class Kafka_Message
{
- /*
- private $currentMagicValue = Kafka_Encoder::CURRENT_MAGIC_VALUE;
- private $magicOffset = 0;
- private $magicLength = 1;
- private $crcOffset = 1; // MagicOffset + MagicLength
- private $crcLength = 4;
- private $payloadOffset = 5; // CrcOffset + CrcLength
- private $headerSize = 5; // PayloadOffset
- */
/**
* @var string
@@ -46,6 +38,11 @@ class Kafka_Message
private $size = 0;
/**
+ * @var integer
+ */
+ private $compression = 0;
+
+ /**
* @var string
*/
private $crc = false;
@@ -56,10 +53,12 @@ class Kafka_Message
* @param string $data Message payload
*/
public function __construct($data) {
- $this->payload = substr($data, 5);
+ $this->payload = substr($data, 6);
+ $this->compression = substr($data,1,1);
$this->crc = crc32($this->payload);
$this->size = strlen($this->payload);
}
+
/**
* Encode a message
@@ -121,7 +120,7 @@ class Kafka_Message
* @return string
*/
public function __toString() {
- return 'message(magic = ' . Kafka_Encoder::CURRENT_MAGIC_VALUE . ', crc = ' . $this->crc .
- ', payload = ' . $this->payload . ')';
+ return 'message(magic = ' . Kafka_Encoder::CURRENT_MAGIC_VALUE . ', compression = ' . $this->compression .
+ ', crc = ' . $this->crc . ', payload = ' . $this->payload . ')';
}
}
Modified: incubator/kafka/trunk/clients/php/src/lib/Kafka/Producer.php
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/php/src/lib/Kafka/Producer.php?rev=1185774&r1=1185773&r2=1185774&view=diff
==============================================================================
--- incubator/kafka/trunk/clients/php/src/lib/Kafka/Producer.php (original)
+++ incubator/kafka/trunk/clients/php/src/lib/Kafka/Producer.php Tue Oct 18 17:57:29 2011
@@ -43,6 +43,11 @@ class Kafka_Producer
protected $port;
/**
+ * @var integer
+ */
+ protected $compression;
+
+ /**
* Constructor
*
* @param integer $host Host
@@ -52,6 +57,7 @@ class Kafka_Producer
$this->request_key = 0;
$this->host = $host;
$this->port = $port;
+ $this->compression = 0;
}
/**
@@ -91,7 +97,7 @@ class Kafka_Producer
*/
public function send(array $messages, $topic, $partition = 0xFFFFFFFF) {
$this->connect();
- return fwrite($this->conn, Kafka_Encoder::encode_produce_request($topic, $partition, $messages));
+ return fwrite($this->conn, Kafka_Encoder::encode_produce_request($topic, $partition, $messages, $this->compression));
}
/**