You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/01/13 02:08:11 UTC
[4/7] incubator-eagle git commit: EAGLE-120 EAGLE-100 initial system
and hadoop metric initial system and hadoop metric
https://issues.apache.org/jira/browse/EAGLE-120 Author: qingwen220
qingwzhao@ebay.com Reviewer: yonzhang2012 yonzhang2012@apache.org C
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1.1/resources/kafka.properties
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1.1/resources/kafka.properties b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1.1/resources/kafka.properties
new file mode 100644
index 0000000..a638f39
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1.1/resources/kafka.properties
@@ -0,0 +1,118 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+broker.id={broker_id}
+
+############################# Socket Server Settings #############################
+
+# The port the socket server listens on
+port={port}
+
+# Hostname the broker will bind to. If not set, the server will bind to all interfaces
+host.name={host}
+
+# Hostname the broker will advertise to producers and consumers. If not set, it uses the
+# value for "host.name" if configured. Otherwise, it will use the value returned from
+# java.net.InetAddress.getCanonicalHostName().
+#advertised.host.name=<hostname routable by clients>
+
+# The port to publish to ZooKeeper for clients to use. If this is not set,
+# it will publish the same port that the broker binds to.
+#advertised.port=<port accessible by clients>
+
+# The number of threads handling network requests
+num.network.threads=2
+
+# The number of threads doing disk I/O
+num.io.threads=8
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=1048576
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=1048576
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# A comma seperated list of directories under which to store log files
+log.dirs={tmp_dir}/data
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions={partitions}
+default.replication.factor={replicas}
+
+############################# Log Flush Policy #############################
+
+# Messages are immediately written to the filesystem but by default we only fsync() to sync
+# the OS cache lazily. The following configurations control the flush of data to disk.
+# There are a few important trade-offs here:
+# 1. Durability: Unflushed data may be lost if you are not using replication.
+# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
+# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+#log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+#log.flush.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
+# segments don't drop below log.retention.bytes.
+#log.retention.bytes=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+log.segment.bytes=536870912
+
+# The interval at which log segments are checked to see if they can be deleted according
+# to the retention policies
+log.retention.check.interval.ms=60000
+
+# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
+# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
+log.cleaner.enable=false
+
+############################# Zookeeper #############################
+
+# Zookeeper connection string (see zookeeper docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zookeeper.connect={zk_host}:{zk_port}/{zk_chroot}
+
+# Timeout in ms for connecting to zookeeper
+zookeeper.connection.timeout.ms=1000000
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1.1/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1.1/resources/log4j.properties b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1.1/resources/log4j.properties
new file mode 100644
index 0000000..f863b3b
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1.1/resources/log4j.properties
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.logger.kafka=DEBUG, stdout
+log4j.logger.org.I0Itec.zkclient.ZkClient=INFO, stdout
+log4j.logger.org.apache.zookeeper=INFO, stdout
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1.1/resources/zookeeper.properties
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1.1/resources/zookeeper.properties b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1.1/resources/zookeeper.properties
new file mode 100644
index 0000000..e3fd097
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1.1/resources/zookeeper.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# the directory where the snapshot is stored.
+dataDir={tmp_dir}
+# the port at which the clients will connect
+clientPort={port}
+clientPortAddress={host}
+# disable the per-ip limit on the number of connections since this is a non-production config
+maxClientCnxns=0
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1/resources/kafka.properties
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1/resources/kafka.properties b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1/resources/kafka.properties
new file mode 100644
index 0000000..5d47520
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1/resources/kafka.properties
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+############################# Server Basics #############################
+
+broker.id={broker_id}
+
+############################# Socket Server Settings #############################
+
+port={port}
+host.name={host}
+
+num.network.threads=2
+num.io.threads=2
+
+socket.send.buffer.bytes=1048576
+socket.receive.buffer.bytes=1048576
+socket.request.max.bytes=104857600
+
+############################# Log Basics #############################
+
+log.dirs={tmp_dir}/data
+num.partitions={partitions}
+default.replication.factor={replicas}
+
+############################# Log Flush Policy #############################
+
+log.flush.interval.messages=10000
+log.flush.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+log.retention.hours=168
+log.segment.bytes=536870912
+log.retention.check.interval.ms=60000
+log.cleanup.interval.mins=1
+log.cleaner.enable=false
+
+############################# Zookeeper #############################
+
+# Zookeeper connection string (see zookeeper docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zookeeper.connect={zk_host}:{zk_port}/{zk_chroot}
+zookeeper.connection.timeout.ms=1000000
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1/resources/log4j.properties b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1/resources/log4j.properties
new file mode 100644
index 0000000..f863b3b
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1/resources/log4j.properties
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.logger.kafka=DEBUG, stdout
+log4j.logger.org.I0Itec.zkclient.ZkClient=INFO, stdout
+log4j.logger.org.apache.zookeeper=INFO, stdout
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1/resources/zookeeper.properties
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1/resources/zookeeper.properties b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1/resources/zookeeper.properties
new file mode 100644
index 0000000..68e1ef9
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.1/resources/zookeeper.properties
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+dataDir={tmp_dir}
+clientPortAddress={host}
+clientPort={port}
+maxClientCnxns=0
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.2.0/resources/kafka.properties
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.2.0/resources/kafka.properties b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.2.0/resources/kafka.properties
new file mode 100644
index 0000000..a638f39
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.2.0/resources/kafka.properties
@@ -0,0 +1,118 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+broker.id={broker_id}
+
+############################# Socket Server Settings #############################
+
+# The port the socket server listens on
+port={port}
+
+# Hostname the broker will bind to. If not set, the server will bind to all interfaces
+host.name={host}
+
+# Hostname the broker will advertise to producers and consumers. If not set, it uses the
+# value for "host.name" if configured. Otherwise, it will use the value returned from
+# java.net.InetAddress.getCanonicalHostName().
+#advertised.host.name=<hostname routable by clients>
+
+# The port to publish to ZooKeeper for clients to use. If this is not set,
+# it will publish the same port that the broker binds to.
+#advertised.port=<port accessible by clients>
+
+# The number of threads handling network requests
+num.network.threads=2
+
+# The number of threads doing disk I/O
+num.io.threads=8
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=1048576
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=1048576
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# A comma seperated list of directories under which to store log files
+log.dirs={tmp_dir}/data
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions={partitions}
+default.replication.factor={replicas}
+
+############################# Log Flush Policy #############################
+
+# Messages are immediately written to the filesystem but by default we only fsync() to sync
+# the OS cache lazily. The following configurations control the flush of data to disk.
+# There are a few important trade-offs here:
+# 1. Durability: Unflushed data may be lost if you are not using replication.
+# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
+# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+#log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+#log.flush.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
+# segments don't drop below log.retention.bytes.
+#log.retention.bytes=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+log.segment.bytes=536870912
+
+# The interval at which log segments are checked to see if they can be deleted according
+# to the retention policies
+log.retention.check.interval.ms=60000
+
+# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
+# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
+log.cleaner.enable=false
+
+############################# Zookeeper #############################
+
+# Zookeeper connection string (see zookeeper docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zookeeper.connect={zk_host}:{zk_port}/{zk_chroot}
+
+# Timeout in ms for connecting to zookeeper
+zookeeper.connection.timeout.ms=1000000
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.2.0/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.2.0/resources/log4j.properties b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.2.0/resources/log4j.properties
new file mode 100644
index 0000000..f863b3b
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.2.0/resources/log4j.properties
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.logger.kafka=DEBUG, stdout
+log4j.logger.org.I0Itec.zkclient.ZkClient=INFO, stdout
+log4j.logger.org.apache.zookeeper=INFO, stdout
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.2.0/resources/zookeeper.properties
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.2.0/resources/zookeeper.properties b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.2.0/resources/zookeeper.properties
new file mode 100644
index 0000000..e3fd097
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/0.8.2.0/resources/zookeeper.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# the directory where the snapshot is stored.
+dataDir={tmp_dir}
+# the port at which the clients will connect
+clientPort={port}
+clientPortAddress={host}
+# disable the per-ip limit on the number of connections since this is a non-production config
+maxClientCnxns=0
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/trunk/resources/kafka.properties
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/trunk/resources/kafka.properties b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/trunk/resources/kafka.properties
new file mode 100644
index 0000000..a638f39
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/trunk/resources/kafka.properties
@@ -0,0 +1,118 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+broker.id={broker_id}
+
+############################# Socket Server Settings #############################
+
+# The port the socket server listens on
+port={port}
+
+# Hostname the broker will bind to. If not set, the server will bind to all interfaces
+host.name={host}
+
+# Hostname the broker will advertise to producers and consumers. If not set, it uses the
+# value for "host.name" if configured. Otherwise, it will use the value returned from
+# java.net.InetAddress.getCanonicalHostName().
+#advertised.host.name=<hostname routable by clients>
+
+# The port to publish to ZooKeeper for clients to use. If this is not set,
+# it will publish the same port that the broker binds to.
+#advertised.port=<port accessible by clients>
+
+# The number of threads handling network requests
+num.network.threads=2
+
+# The number of threads doing disk I/O
+num.io.threads=8
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=1048576
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=1048576
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# A comma seperated list of directories under which to store log files
+log.dirs={tmp_dir}/data
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions={partitions}
+default.replication.factor={replicas}
+
+############################# Log Flush Policy #############################
+
+# Messages are immediately written to the filesystem but by default we only fsync() to sync
+# the OS cache lazily. The following configurations control the flush of data to disk.
+# There are a few important trade-offs here:
+# 1. Durability: Unflushed data may be lost if you are not using replication.
+# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
+# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+#log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+#log.flush.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
+# segments don't drop below log.retention.bytes.
+#log.retention.bytes=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+log.segment.bytes=536870912
+
+# The interval at which log segments are checked to see if they can be deleted according
+# to the retention policies
+log.retention.check.interval.ms=60000
+
+# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
+# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
+log.cleaner.enable=false
+
+############################# Zookeeper #############################
+
+# Zookeeper connection string (see zookeeper docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zookeeper.connect={zk_host}:{zk_port}/{zk_chroot}
+
+# Timeout in ms for connecting to zookeeper
+zookeeper.connection.timeout.ms=1000000
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/trunk/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/trunk/resources/log4j.properties b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/trunk/resources/log4j.properties
new file mode 100644
index 0000000..f863b3b
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/trunk/resources/log4j.properties
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+log4j.logger.kafka=DEBUG, stdout
+log4j.logger.org.I0Itec.zkclient.ZkClient=INFO, stdout
+log4j.logger.org.apache.zookeeper=INFO, stdout
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/trunk/resources/zookeeper.properties
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/trunk/resources/zookeeper.properties b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/trunk/resources/zookeeper.properties
new file mode 100644
index 0000000..e3fd097
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/servers/trunk/resources/zookeeper.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# the directory where the snapshot is stored.
+dataDir={tmp_dir}
+# the port at which the clients will connect
+clientPort={port}
+clientPortAddress={host}
+# disable the per-ip limit on the number of connections since this is a non-production config
+maxClientCnxns=0
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/setup.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/setup.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/setup.py
new file mode 100644
index 0000000..f1c1954
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/setup.py
@@ -0,0 +1,70 @@
+import sys
+import os
+from setuptools import setup, Command
+
+with open('VERSION', 'r') as v:
+ __version__ = v.read().rstrip()
+
+
+class Tox(Command):
+
+ user_options = []
+
+ def initialize_options(self):
+ pass
+
+ def finalize_options(self):
+ pass
+
+ @classmethod
+ def run(cls):
+ import tox
+ sys.exit(tox.cmdline([]))
+
+
+test_require = ['tox', 'mock']
+if sys.version_info < (2, 7):
+ test_require.append('unittest2')
+
+here = os.path.abspath(os.path.dirname(__file__))
+
+with open(os.path.join(here, 'README.rst')) as f:
+ README = f.read()
+
+setup(
+ name="kafka-python",
+ version=__version__,
+
+ tests_require=test_require,
+ cmdclass={"test": Tox},
+
+ packages=[
+ "kafka",
+ "kafka.consumer",
+ "kafka.partitioner",
+ "kafka.producer",
+ ],
+
+ author="David Arthur",
+ author_email="mumrah@gmail.com",
+ url="https://github.com/mumrah/kafka-python",
+ license="Apache License 2.0",
+ description="Pure Python client for Apache Kafka",
+ long_description=README,
+ keywords="apache kafka",
+ install_requires=['six'],
+ classifiers=[
+ "Development Status :: 4 - Beta",
+ "Intended Audience :: Developers",
+ "License :: OSI Approved :: Apache Software License",
+ "Programming Language :: Python",
+ "Programming Language :: Python :: 2",
+ "Programming Language :: Python :: 2.6",
+ "Programming Language :: Python :: 2.7",
+ "Programming Language :: Python :: 3",
+ "Programming Language :: Python :: 3.3",
+ "Programming Language :: Python :: 3.4",
+ "Programming Language :: Python :: Implementation :: PyPy",
+ "Topic :: Software Development :: Libraries :: Python Modules",
+ ]
+)
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/__init__.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/__init__.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/__init__.py
new file mode 100644
index 0000000..c4d1e80
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/__init__.py
@@ -0,0 +1,6 @@
+import sys
+
+if sys.version_info < (2, 7):
+ import unittest2 as unittest
+else:
+ import unittest
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/fixtures.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/fixtures.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/fixtures.py
new file mode 100644
index 0000000..3c496fd
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/fixtures.py
@@ -0,0 +1,236 @@
+import logging
+import os
+import os.path
+import shutil
+import subprocess
+import tempfile
+from six.moves import urllib
+import uuid
+
+from six.moves.urllib.parse import urlparse # pylint: disable-msg=E0611
+from test.service import ExternalService, SpawnedService
+from test.testutil import get_open_port
+
+class Fixture(object):
+ kafka_version = os.environ.get('KAFKA_VERSION', '0.8.0')
+ scala_version = os.environ.get("SCALA_VERSION", '2.8.0')
+ project_root = os.environ.get('PROJECT_ROOT', os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
+ kafka_root = os.environ.get("KAFKA_ROOT", os.path.join(project_root, 'servers', kafka_version, "kafka-bin"))
+ ivy_root = os.environ.get('IVY_ROOT', os.path.expanduser("~/.ivy2/cache"))
+
+ @classmethod
+ def download_official_distribution(cls,
+ kafka_version=None,
+ scala_version=None,
+ output_dir=None):
+ if not kafka_version:
+ kafka_version = cls.kafka_version
+ if not scala_version:
+ scala_version = cls.scala_version
+ if not output_dir:
+ output_dir = os.path.join(cls.project_root, 'servers', 'dist')
+
+ distfile = 'kafka_%s-%s' % (scala_version, kafka_version,)
+ url_base = 'https://archive.apache.org/dist/kafka/%s/' % (kafka_version,)
+ output_file = os.path.join(output_dir, distfile + '.tgz')
+
+ if os.path.isfile(output_file):
+ logging.info("Found file already on disk: %s", output_file)
+ return output_file
+
+ # New tarballs are .tgz, older ones are sometimes .tar.gz
+ try:
+ url = url_base + distfile + '.tgz'
+ logging.info("Attempting to download %s", url)
+ response = urllib.request.urlopen(url)
+ except urllib.error.HTTPError:
+ logging.exception("HTTP Error")
+ url = url_base + distfile + '.tar.gz'
+ logging.info("Attempting to download %s", url)
+ response = urllib.request.urlopen(url)
+
+ logging.info("Saving distribution file to %s", output_file)
+ with open(output_file, 'w') as output_file_fd:
+ output_file_fd.write(response.read())
+
+ return output_file
+
+ @classmethod
+ def test_resource(cls, filename):
+ return os.path.join(cls.project_root, "servers", cls.kafka_version, "resources", filename)
+
+ @classmethod
+ def kafka_run_class_args(cls, *args):
+ result = [os.path.join(cls.kafka_root, 'bin', 'kafka-run-class.sh')]
+ result.extend(args)
+ return result
+
+ @classmethod
+ def kafka_run_class_env(cls):
+ env = os.environ.copy()
+ env['KAFKA_LOG4J_OPTS'] = "-Dlog4j.configuration=file:%s" % cls.test_resource("log4j.properties")
+ return env
+
+ @classmethod
+ def render_template(cls, source_file, target_file, binding):
+ with open(source_file, "r") as handle:
+ template = handle.read()
+ with open(target_file, "w") as handle:
+ handle.write(template.format(**binding))
+
+
+class ZookeeperFixture(Fixture):
+ @classmethod
+ def instance(cls):
+ if "ZOOKEEPER_URI" in os.environ:
+ parse = urlparse(os.environ["ZOOKEEPER_URI"])
+ (host, port) = (parse.hostname, parse.port)
+ fixture = ExternalService(host, port)
+ else:
+ (host, port) = ("127.0.0.1", get_open_port())
+ fixture = cls(host, port)
+
+ fixture.open()
+ return fixture
+
+ def __init__(self, host, port):
+ self.host = host
+ self.port = port
+
+ self.tmp_dir = None
+ self.child = None
+
+ def out(self, message):
+ logging.info("*** Zookeeper [%s:%d]: %s", self.host, self.port, message)
+
+ def open(self):
+ self.tmp_dir = tempfile.mkdtemp()
+ self.out("Running local instance...")
+ logging.info(" host = %s", self.host)
+ logging.info(" port = %s", self.port)
+ logging.info(" tmp_dir = %s", self.tmp_dir)
+
+ # Generate configs
+ template = self.test_resource("zookeeper.properties")
+ properties = os.path.join(self.tmp_dir, "zookeeper.properties")
+ self.render_template(template, properties, vars(self))
+
+ # Configure Zookeeper child process
+ args = self.kafka_run_class_args("org.apache.zookeeper.server.quorum.QuorumPeerMain", properties)
+ env = self.kafka_run_class_env()
+ self.child = SpawnedService(args, env)
+
+ # Party!
+ self.out("Starting...")
+ self.child.start()
+ self.child.wait_for(r"binding to port")
+ self.out("Done!")
+
+ def close(self):
+ self.out("Stopping...")
+ self.child.stop()
+ self.child = None
+ self.out("Done!")
+ shutil.rmtree(self.tmp_dir)
+
+
+class KafkaFixture(Fixture):
+ @classmethod
+ def instance(cls, broker_id, zk_host, zk_port, zk_chroot=None, replicas=1, partitions=2):
+ if zk_chroot is None:
+ zk_chroot = "kafka-python_" + str(uuid.uuid4()).replace("-", "_")
+ if "KAFKA_URI" in os.environ:
+ parse = urlparse(os.environ["KAFKA_URI"])
+ (host, port) = (parse.hostname, parse.port)
+ fixture = ExternalService(host, port)
+ else:
+ (host, port) = ("127.0.0.1", get_open_port())
+ fixture = KafkaFixture(host, port, broker_id, zk_host, zk_port, zk_chroot, replicas, partitions)
+ fixture.open()
+ return fixture
+
+ def __init__(self, host, port, broker_id, zk_host, zk_port, zk_chroot, replicas=1, partitions=2):
+ self.host = host
+ self.port = port
+
+ self.broker_id = broker_id
+
+ self.zk_host = zk_host
+ self.zk_port = zk_port
+ self.zk_chroot = zk_chroot
+
+ self.replicas = replicas
+ self.partitions = partitions
+
+ self.tmp_dir = None
+ self.child = None
+ self.running = False
+
+ def out(self, message):
+ logging.info("*** Kafka [%s:%d]: %s", self.host, self.port, message)
+
+ def open(self):
+ if self.running:
+ self.out("Instance already running")
+ return
+
+ self.tmp_dir = tempfile.mkdtemp()
+ self.out("Running local instance...")
+ logging.info(" host = %s", self.host)
+ logging.info(" port = %s", self.port)
+ logging.info(" broker_id = %s", self.broker_id)
+ logging.info(" zk_host = %s", self.zk_host)
+ logging.info(" zk_port = %s", self.zk_port)
+ logging.info(" zk_chroot = %s", self.zk_chroot)
+ logging.info(" replicas = %s", self.replicas)
+ logging.info(" partitions = %s", self.partitions)
+ logging.info(" tmp_dir = %s", self.tmp_dir)
+
+ # Create directories
+ os.mkdir(os.path.join(self.tmp_dir, "logs"))
+ os.mkdir(os.path.join(self.tmp_dir, "data"))
+
+ # Generate configs
+ template = self.test_resource("kafka.properties")
+ properties = os.path.join(self.tmp_dir, "kafka.properties")
+ self.render_template(template, properties, vars(self))
+
+ # Configure Kafka child process
+ args = self.kafka_run_class_args("kafka.Kafka", properties)
+ env = self.kafka_run_class_env()
+ self.child = SpawnedService(args, env)
+
+ # Party!
+ self.out("Creating Zookeeper chroot node...")
+ args = self.kafka_run_class_args("org.apache.zookeeper.ZooKeeperMain",
+ "-server", "%s:%d" % (self.zk_host, self.zk_port),
+ "create",
+ "/%s" % self.zk_chroot,
+ "kafka-python")
+ env = self.kafka_run_class_env()
+ proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+
+ if proc.wait() != 0:
+ self.out("Failed to create Zookeeper chroot node")
+ self.out(proc.stdout.read())
+ self.out(proc.stderr.read())
+ raise RuntimeError("Failed to create Zookeeper chroot node")
+ self.out("Done!")
+
+ self.out("Starting...")
+ self.child.start()
+ self.child.wait_for(r"\[Kafka Server %d\], Started" % self.broker_id)
+ self.out("Done!")
+ self.running = True
+
+ def close(self):
+ if not self.running:
+ self.out("Instance already stopped")
+ return
+
+ self.out("Stopping...")
+ self.child.stop()
+ self.child = None
+ self.out("Done!")
+ shutil.rmtree(self.tmp_dir)
+ self.running = False
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/service.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/service.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/service.py
new file mode 100644
index 0000000..dcd3e68
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/service.py
@@ -0,0 +1,111 @@
+import logging
+import re
+import select
+import subprocess
+import threading
+import time
+
+__all__ = [
+ 'ExternalService',
+ 'SpawnedService',
+
+]
+
+class ExternalService(object):
+ def __init__(self, host, port):
+ logging.info("Using already running service at %s:%d", host, port)
+ self.host = host
+ self.port = port
+
+ def open(self):
+ pass
+
+ def close(self):
+ pass
+
+
+class SpawnedService(threading.Thread):
+ def __init__(self, args=None, env=None):
+ threading.Thread.__init__(self)
+
+ if args is None:
+ raise TypeError("args parameter is required")
+ self.args = args
+ self.env = env
+ self.captured_stdout = []
+ self.captured_stderr = []
+
+ self.should_die = threading.Event()
+
+ def run(self):
+ self.run_with_handles()
+
+ def run_with_handles(self):
+ self.child = subprocess.Popen(
+ self.args,
+ env=self.env,
+ bufsize=1,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ alive = True
+
+ while True:
+ (rds, _, _) = select.select([self.child.stdout, self.child.stderr], [], [], 1)
+
+ if self.child.stdout in rds:
+ line = self.child.stdout.readline()
+ self.captured_stdout.append(line.decode('utf-8'))
+
+ if self.child.stderr in rds:
+ line = self.child.stderr.readline()
+ self.captured_stderr.append(line.decode('utf-8'))
+
+ if self.should_die.is_set():
+ self.child.terminate()
+ alive = False
+
+ poll_results = self.child.poll()
+ if poll_results is not None:
+ if not alive:
+ break
+ else:
+ self.dump_logs()
+ raise RuntimeError("Subprocess has died. Aborting. (args=%s)" % ' '.join(str(x) for x in self.args))
+
+ def dump_logs(self):
+ logging.critical('stderr')
+ for line in self.captured_stderr:
+ logging.critical(line.rstrip())
+
+ logging.critical('stdout')
+ for line in self.captured_stdout:
+ logging.critical(line.rstrip())
+
+ def wait_for(self, pattern, timeout=30):
+ t1 = time.time()
+ while True:
+ t2 = time.time()
+ if t2 - t1 >= timeout:
+ try:
+ self.child.kill()
+ except:
+ logging.exception("Received exception when killing child process")
+ self.dump_logs()
+
+ raise RuntimeError("Waiting for %r timed out after %d seconds" % (pattern, timeout))
+
+ if re.search(pattern, '\n'.join(self.captured_stdout), re.IGNORECASE) is not None:
+ logging.info("Found pattern %r in %d seconds via stdout", pattern, (t2 - t1))
+ return
+ if re.search(pattern, '\n'.join(self.captured_stderr), re.IGNORECASE) is not None:
+ logging.info("Found pattern %r in %d seconds via stderr", pattern, (t2 - t1))
+ return
+ time.sleep(0.1)
+
+ def start(self):
+ threading.Thread.start(self)
+
+ def stop(self):
+ self.should_die.set()
+ self.join()
+
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_client.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_client.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_client.py
new file mode 100644
index 0000000..c522d9a
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_client.py
@@ -0,0 +1,403 @@
+import socket
+from time import sleep
+
+from mock import ANY, MagicMock, patch
+import six
+from . import unittest
+
+from kafka import KafkaClient
+from kafka.common import (
+ ProduceRequest, MetadataResponse,
+ BrokerMetadata, TopicMetadata, PartitionMetadata,
+ TopicAndPartition, KafkaUnavailableError,
+ LeaderNotAvailableError, UnknownTopicOrPartitionError,
+ KafkaTimeoutError, ConnectionError
+)
+from kafka.conn import KafkaConnection
+from kafka.protocol import KafkaProtocol, create_message
+
+from test.testutil import Timer
+
+NO_ERROR = 0
+UNKNOWN_TOPIC_OR_PARTITION = 3
+NO_LEADER = 5
+
+class TestKafkaClient(unittest.TestCase):
+ def test_init_with_list(self):
+ with patch.object(KafkaClient, 'load_metadata_for_topics'):
+ client = KafkaClient(hosts=['kafka01:9092', 'kafka02:9092', 'kafka03:9092'])
+
+ self.assertEqual(
+ sorted([('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)]),
+ sorted(client.hosts))
+
+ def test_init_with_csv(self):
+ with patch.object(KafkaClient, 'load_metadata_for_topics'):
+ client = KafkaClient(hosts='kafka01:9092,kafka02:9092,kafka03:9092')
+
+ self.assertEqual(
+ sorted([('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)]),
+ sorted(client.hosts))
+
+ def test_init_with_unicode_csv(self):
+ with patch.object(KafkaClient, 'load_metadata_for_topics'):
+ client = KafkaClient(hosts=u'kafka01:9092,kafka02:9092,kafka03:9092')
+
+ self.assertEqual(
+ sorted([('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)]),
+ sorted(client.hosts))
+
+ def test_send_broker_unaware_request_fail(self):
+ 'Tests that call fails when all hosts are unavailable'
+
+ mocked_conns = {
+ ('kafka01', 9092): MagicMock(),
+ ('kafka02', 9092): MagicMock()
+ }
+
+ # inject KafkaConnection side effects
+ mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)")
+ mocked_conns[('kafka02', 9092)].send.side_effect = RuntimeError("Kafka02 went away (unittest)")
+
+ def mock_get_conn(host, port):
+ return mocked_conns[(host, port)]
+
+ # patch to avoid making requests before we want it
+ with patch.object(KafkaClient, 'load_metadata_for_topics'):
+ with patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn):
+ client = KafkaClient(hosts=['kafka01:9092', 'kafka02:9092'])
+
+ req = KafkaProtocol.encode_metadata_request(b'client', 0)
+ with self.assertRaises(KafkaUnavailableError):
+ client._send_broker_unaware_request(payloads=['fake request'],
+ encoder_fn=MagicMock(return_value='fake encoded message'),
+ decoder_fn=lambda x: x)
+
+ for key, conn in six.iteritems(mocked_conns):
+ conn.send.assert_called_with(ANY, 'fake encoded message')
+
+ def test_send_broker_unaware_request(self):
+ 'Tests that call works when at least one of the host is available'
+
+ mocked_conns = {
+ ('kafka01', 9092): MagicMock(),
+ ('kafka02', 9092): MagicMock(),
+ ('kafka03', 9092): MagicMock()
+ }
+ # inject KafkaConnection side effects
+ mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)")
+ mocked_conns[('kafka02', 9092)].recv.return_value = 'valid response'
+ mocked_conns[('kafka03', 9092)].send.side_effect = RuntimeError("kafka03 went away (unittest)")
+
+ def mock_get_conn(host, port):
+ return mocked_conns[(host, port)]
+
+ # patch to avoid making requests before we want it
+ with patch.object(KafkaClient, 'load_metadata_for_topics'):
+ with patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn):
+ with patch.object(KafkaClient, '_next_id', return_value=1):
+ client = KafkaClient(hosts='kafka01:9092,kafka02:9092')
+
+ resp = client._send_broker_unaware_request(payloads=['fake request'],
+ encoder_fn=MagicMock(),
+ decoder_fn=lambda x: x)
+
+ self.assertEqual('valid response', resp)
+ mocked_conns[('kafka02', 9092)].recv.assert_called_with(1)
+
+ @patch('kafka.client.KafkaConnection')
+ @patch('kafka.client.KafkaProtocol')
+ def test_load_metadata(self, protocol, conn):
+
+ conn.recv.return_value = 'response' # anything but None
+
+ brokers = [
+ BrokerMetadata(0, 'broker_1', 4567),
+ BrokerMetadata(1, 'broker_2', 5678)
+ ]
+
+ topics = [
+ TopicMetadata('topic_1', NO_ERROR, [
+ PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2], NO_ERROR)
+ ]),
+ TopicMetadata('topic_noleader', NO_ERROR, [
+ PartitionMetadata('topic_noleader', 0, -1, [], [],
+ NO_LEADER),
+ PartitionMetadata('topic_noleader', 1, -1, [], [],
+ NO_LEADER),
+ ]),
+ TopicMetadata('topic_no_partitions', NO_LEADER, []),
+ TopicMetadata('topic_unknown', UNKNOWN_TOPIC_OR_PARTITION, []),
+ TopicMetadata('topic_3', NO_ERROR, [
+ PartitionMetadata('topic_3', 0, 0, [0, 1], [0, 1], NO_ERROR),
+ PartitionMetadata('topic_3', 1, 1, [1, 0], [1, 0], NO_ERROR),
+ PartitionMetadata('topic_3', 2, 0, [0, 1], [0, 1], NO_ERROR)
+ ])
+ ]
+ protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
+
+ # client loads metadata at init
+ client = KafkaClient(hosts=['broker_1:4567'])
+ self.assertDictEqual({
+ TopicAndPartition('topic_1', 0): brokers[1],
+ TopicAndPartition('topic_noleader', 0): None,
+ TopicAndPartition('topic_noleader', 1): None,
+ TopicAndPartition('topic_3', 0): brokers[0],
+ TopicAndPartition('topic_3', 1): brokers[1],
+ TopicAndPartition('topic_3', 2): brokers[0]},
+ client.topics_to_brokers)
+
+ # if we ask for metadata explicitly, it should raise errors
+ with self.assertRaises(LeaderNotAvailableError):
+ client.load_metadata_for_topics('topic_no_partitions')
+
+ with self.assertRaises(UnknownTopicOrPartitionError):
+ client.load_metadata_for_topics('topic_unknown')
+
+ # This should not raise
+ client.load_metadata_for_topics('topic_no_leader')
+
+ @patch('kafka.client.KafkaConnection')
+ @patch('kafka.client.KafkaProtocol')
+ def test_has_metadata_for_topic(self, protocol, conn):
+
+ conn.recv.return_value = 'response' # anything but None
+
+ brokers = [
+ BrokerMetadata(0, 'broker_1', 4567),
+ BrokerMetadata(1, 'broker_2', 5678)
+ ]
+
+ topics = [
+ TopicMetadata('topic_still_creating', NO_LEADER, []),
+ TopicMetadata('topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []),
+ TopicMetadata('topic_noleaders', NO_ERROR, [
+ PartitionMetadata('topic_noleaders', 0, -1, [], [], NO_LEADER),
+ PartitionMetadata('topic_noleaders', 1, -1, [], [], NO_LEADER),
+ ]),
+ ]
+ protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
+
+ client = KafkaClient(hosts=['broker_1:4567'])
+
+ # Topics with no partitions return False
+ self.assertFalse(client.has_metadata_for_topic('topic_still_creating'))
+ self.assertFalse(client.has_metadata_for_topic('topic_doesnt_exist'))
+
+ # Topic with partition metadata, but no leaders return True
+ self.assertTrue(client.has_metadata_for_topic('topic_noleaders'))
+
+ @patch('kafka.client.KafkaConnection')
+ @patch('kafka.client.KafkaProtocol')
+ def test_ensure_topic_exists(self, protocol, conn):
+
+ conn.recv.return_value = 'response' # anything but None
+
+ brokers = [
+ BrokerMetadata(0, 'broker_1', 4567),
+ BrokerMetadata(1, 'broker_2', 5678)
+ ]
+
+ topics = [
+ TopicMetadata('topic_still_creating', NO_LEADER, []),
+ TopicMetadata('topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []),
+ TopicMetadata('topic_noleaders', NO_ERROR, [
+ PartitionMetadata('topic_noleaders', 0, -1, [], [], NO_LEADER),
+ PartitionMetadata('topic_noleaders', 1, -1, [], [], NO_LEADER),
+ ]),
+ ]
+ protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
+
+ client = KafkaClient(hosts=['broker_1:4567'])
+
+ with self.assertRaises(UnknownTopicOrPartitionError):
+ client.ensure_topic_exists('topic_doesnt_exist', timeout=1)
+
+ with self.assertRaises(KafkaTimeoutError):
+ client.ensure_topic_exists('topic_still_creating', timeout=1)
+
+ # This should not raise
+ client.ensure_topic_exists('topic_noleaders', timeout=1)
+
+ @patch('kafka.client.KafkaConnection')
+ @patch('kafka.client.KafkaProtocol')
+ def test_get_leader_for_partitions_reloads_metadata(self, protocol, conn):
+ "Get leader for partitions reload metadata if it is not available"
+
+ conn.recv.return_value = 'response' # anything but None
+
+ brokers = [
+ BrokerMetadata(0, 'broker_1', 4567),
+ BrokerMetadata(1, 'broker_2', 5678)
+ ]
+
+ topics = [
+ TopicMetadata('topic_no_partitions', NO_LEADER, [])
+ ]
+ protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
+
+ client = KafkaClient(hosts=['broker_1:4567'])
+
+ # topic metadata is loaded but empty
+ self.assertDictEqual({}, client.topics_to_brokers)
+
+ topics = [
+ TopicMetadata('topic_one_partition', NO_ERROR, [
+ PartitionMetadata('topic_no_partition', 0, 0, [0, 1], [0, 1], NO_ERROR)
+ ])
+ ]
+ protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
+
+ # calling _get_leader_for_partition (from any broker aware request)
+ # will try loading metadata again for the same topic
+ leader = client._get_leader_for_partition('topic_one_partition', 0)
+
+ self.assertEqual(brokers[0], leader)
+ self.assertDictEqual({
+ TopicAndPartition('topic_one_partition', 0): brokers[0]},
+ client.topics_to_brokers)
+
+ @patch('kafka.client.KafkaConnection')
+ @patch('kafka.client.KafkaProtocol')
+ def test_get_leader_for_unassigned_partitions(self, protocol, conn):
+
+ conn.recv.return_value = 'response' # anything but None
+
+ brokers = [
+ BrokerMetadata(0, 'broker_1', 4567),
+ BrokerMetadata(1, 'broker_2', 5678)
+ ]
+
+ topics = [
+ TopicMetadata('topic_no_partitions', NO_LEADER, []),
+ TopicMetadata('topic_unknown', UNKNOWN_TOPIC_OR_PARTITION, []),
+ ]
+ protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
+
+ client = KafkaClient(hosts=['broker_1:4567'])
+
+ self.assertDictEqual({}, client.topics_to_brokers)
+
+ with self.assertRaises(LeaderNotAvailableError):
+ client._get_leader_for_partition('topic_no_partitions', 0)
+
+ with self.assertRaises(UnknownTopicOrPartitionError):
+ client._get_leader_for_partition('topic_unknown', 0)
+
+ @patch('kafka.client.KafkaConnection')
+ @patch('kafka.client.KafkaProtocol')
+ def test_get_leader_exceptions_when_noleader(self, protocol, conn):
+
+ conn.recv.return_value = 'response' # anything but None
+
+ brokers = [
+ BrokerMetadata(0, 'broker_1', 4567),
+ BrokerMetadata(1, 'broker_2', 5678)
+ ]
+
+ topics = [
+ TopicMetadata('topic_noleader', NO_ERROR, [
+ PartitionMetadata('topic_noleader', 0, -1, [], [],
+ NO_LEADER),
+ PartitionMetadata('topic_noleader', 1, -1, [], [],
+ NO_LEADER),
+ ]),
+ ]
+ protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
+
+ client = KafkaClient(hosts=['broker_1:4567'])
+ self.assertDictEqual(
+ {
+ TopicAndPartition('topic_noleader', 0): None,
+ TopicAndPartition('topic_noleader', 1): None
+ },
+ client.topics_to_brokers)
+
+ # No leader partitions -- raise LeaderNotAvailableError
+ with self.assertRaises(LeaderNotAvailableError):
+ self.assertIsNone(client._get_leader_for_partition('topic_noleader', 0))
+ with self.assertRaises(LeaderNotAvailableError):
+ self.assertIsNone(client._get_leader_for_partition('topic_noleader', 1))
+
+ # Unknown partitions -- raise UnknownTopicOrPartitionError
+ with self.assertRaises(UnknownTopicOrPartitionError):
+ self.assertIsNone(client._get_leader_for_partition('topic_noleader', 2))
+
+ topics = [
+ TopicMetadata('topic_noleader', NO_ERROR, [
+ PartitionMetadata('topic_noleader', 0, 0, [0, 1], [0, 1], NO_ERROR),
+ PartitionMetadata('topic_noleader', 1, 1, [1, 0], [1, 0], NO_ERROR)
+ ]),
+ ]
+ protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
+ self.assertEqual(brokers[0], client._get_leader_for_partition('topic_noleader', 0))
+ self.assertEqual(brokers[1], client._get_leader_for_partition('topic_noleader', 1))
+
+ @patch('kafka.client.KafkaConnection')
+ @patch('kafka.client.KafkaProtocol')
+ def test_send_produce_request_raises_when_noleader(self, protocol, conn):
+ "Send producer request raises LeaderNotAvailableError if leader is not available"
+
+ conn.recv.return_value = 'response' # anything but None
+
+ brokers = [
+ BrokerMetadata(0, 'broker_1', 4567),
+ BrokerMetadata(1, 'broker_2', 5678)
+ ]
+
+ topics = [
+ TopicMetadata('topic_noleader', NO_ERROR, [
+ PartitionMetadata('topic_noleader', 0, -1, [], [],
+ NO_LEADER),
+ PartitionMetadata('topic_noleader', 1, -1, [], [],
+ NO_LEADER),
+ ]),
+ ]
+ protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
+
+ client = KafkaClient(hosts=['broker_1:4567'])
+
+ requests = [ProduceRequest(
+ "topic_noleader", 0,
+ [create_message("a"), create_message("b")])]
+
+ with self.assertRaises(LeaderNotAvailableError):
+ client.send_produce_request(requests)
+
+ @patch('kafka.client.KafkaConnection')
+ @patch('kafka.client.KafkaProtocol')
+ def test_send_produce_request_raises_when_topic_unknown(self, protocol, conn):
+
+ conn.recv.return_value = 'response' # anything but None
+
+ brokers = [
+ BrokerMetadata(0, 'broker_1', 4567),
+ BrokerMetadata(1, 'broker_2', 5678)
+ ]
+
+ topics = [
+ TopicMetadata('topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []),
+ ]
+ protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
+
+ client = KafkaClient(hosts=['broker_1:4567'])
+
+ requests = [ProduceRequest(
+ "topic_doesnt_exist", 0,
+ [create_message("a"), create_message("b")])]
+
+ with self.assertRaises(UnknownTopicOrPartitionError):
+ client.send_produce_request(requests)
+
+ def test_timeout(self):
+ def _timeout(*args, **kwargs):
+ timeout = args[1]
+ sleep(timeout)
+ raise socket.timeout
+
+ with patch.object(socket, "create_connection", side_effect=_timeout):
+
+ with Timer() as t:
+ with self.assertRaises(ConnectionError):
+ KafkaConnection("nowhere", 1234, 1.0)
+ self.assertGreaterEqual(t.interval, 1.0)
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_client_integration.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_client_integration.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_client_integration.py
new file mode 100644
index 0000000..c0331ea
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_client_integration.py
@@ -0,0 +1,67 @@
+import os
+
+from kafka.common import (
+ FetchRequest, OffsetCommitRequest, OffsetFetchRequest,
+ KafkaTimeoutError
+)
+
+from test.fixtures import ZookeeperFixture, KafkaFixture
+from test.testutil import (
+ KafkaIntegrationTestCase, kafka_versions
+)
+
+class TestKafkaClientIntegration(KafkaIntegrationTestCase):
+ @classmethod
+ def setUpClass(cls): # noqa
+ if not os.environ.get('KAFKA_VERSION'):
+ return
+
+ cls.zk = ZookeeperFixture.instance()
+ cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
+
+ @classmethod
+ def tearDownClass(cls): # noqa
+ if not os.environ.get('KAFKA_VERSION'):
+ return
+
+ cls.server.close()
+ cls.zk.close()
+
+ @kafka_versions("all")
+ def test_consume_none(self):
+ fetch = FetchRequest(self.topic, 0, 0, 1024)
+
+ fetch_resp, = self.client.send_fetch_request([fetch])
+ self.assertEqual(fetch_resp.error, 0)
+ self.assertEqual(fetch_resp.topic, self.topic)
+ self.assertEqual(fetch_resp.partition, 0)
+
+ messages = list(fetch_resp.messages)
+ self.assertEqual(len(messages), 0)
+
+ @kafka_versions("all")
+ def test_ensure_topic_exists(self):
+
+ # assume that self.topic was created by setUp
+ # if so, this should succeed
+ self.client.ensure_topic_exists(self.topic, timeout=1)
+
+ # ensure_topic_exists should fail with KafkaTimeoutError
+ with self.assertRaises(KafkaTimeoutError):
+ self.client.ensure_topic_exists(b"this_topic_doesnt_exist", timeout=0)
+
+ ####################
+ # Offset Tests #
+ ####################
+
+ @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0")
+ def test_commit_fetch_offsets(self):
+ req = OffsetCommitRequest(self.topic, 0, 42, b"metadata")
+ (resp,) = self.client.send_offset_commit_request(b"group", [req])
+ self.assertEqual(resp.error, 0)
+
+ req = OffsetFetchRequest(self.topic, 0)
+ (resp,) = self.client.send_offset_fetch_request(b"group", [req])
+ self.assertEqual(resp.error, 0)
+ self.assertEqual(resp.offset, 42)
+ self.assertEqual(resp.metadata, b"") # Metadata isn't stored for now
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_codec.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_codec.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_codec.py
new file mode 100644
index 0000000..2d7670a
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_codec.py
@@ -0,0 +1,72 @@
+import struct
+
+from six.moves import xrange
+from . import unittest
+
+from kafka.codec import (
+ has_snappy, gzip_encode, gzip_decode,
+ snappy_encode, snappy_decode
+)
+
+from test.testutil import random_string
+
+class TestCodec(unittest.TestCase):
+ def test_gzip(self):
+ for i in xrange(1000):
+ s1 = random_string(100)
+ s2 = gzip_decode(gzip_encode(s1))
+ self.assertEqual(s1, s2)
+
+ @unittest.skipUnless(has_snappy(), "Snappy not available")
+ def test_snappy(self):
+ for i in xrange(1000):
+ s1 = random_string(100)
+ s2 = snappy_decode(snappy_encode(s1))
+ self.assertEqual(s1, s2)
+
+ @unittest.skipUnless(has_snappy(), "Snappy not available")
+ def test_snappy_detect_xerial(self):
+ import kafka as kafka1
+ _detect_xerial_stream = kafka1.codec._detect_xerial_stream
+
+ header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01Some extra bytes'
+ false_header = b'\x01SNAPPY\x00\x00\x00\x01\x00\x00\x00\x01'
+ random_snappy = snappy_encode(b'SNAPPY' * 50)
+ short_data = b'\x01\x02\x03\x04'
+
+ self.assertTrue(_detect_xerial_stream(header))
+ self.assertFalse(_detect_xerial_stream(b''))
+ self.assertFalse(_detect_xerial_stream(b'\x00'))
+ self.assertFalse(_detect_xerial_stream(false_header))
+ self.assertFalse(_detect_xerial_stream(random_snappy))
+ self.assertFalse(_detect_xerial_stream(short_data))
+
+ @unittest.skipUnless(has_snappy(), "Snappy not available")
+ def test_snappy_decode_xerial(self):
+ header = b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01'
+ random_snappy = snappy_encode(b'SNAPPY' * 50)
+ block_len = len(random_snappy)
+ random_snappy2 = snappy_encode(b'XERIAL' * 50)
+ block_len2 = len(random_snappy2)
+
+ to_test = header \
+ + struct.pack('!i', block_len) + random_snappy \
+ + struct.pack('!i', block_len2) + random_snappy2 \
+
+ self.assertEqual(snappy_decode(to_test), (b'SNAPPY' * 50) + (b'XERIAL' * 50))
+
+ @unittest.skipUnless(has_snappy(), "Snappy not available")
+ def test_snappy_encode_xerial(self):
+ to_ensure = (
+ b'\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01'
+ b'\x00\x00\x00\x18'
+ b'\xac\x02\x14SNAPPY\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00'
+ b'\x00\x00\x00\x18'
+ b'\xac\x02\x14XERIAL\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\xfe\x06\x00\x96\x06\x00'
+ )
+
+ to_test = (b'SNAPPY' * 50) + (b'XERIAL' * 50)
+
+ compressed = snappy_encode(to_test, xerial_compatible=True, xerial_blocksize=300)
+ self.assertEqual(compressed, to_ensure)
+
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_conn.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_conn.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_conn.py
new file mode 100644
index 0000000..2c8f3b2
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_conn.py
@@ -0,0 +1,164 @@
+import socket
+import struct
+
+import mock
+from . import unittest
+
+from kafka.common import ConnectionError
+from kafka.conn import KafkaConnection, collect_hosts, DEFAULT_SOCKET_TIMEOUT_SECONDS
+
+class ConnTest(unittest.TestCase):
+ def setUp(self):
+ self.config = {
+ 'host': 'localhost',
+ 'port': 9090,
+ 'request_id': 0,
+ 'payload': b'test data',
+ 'payload2': b'another packet'
+ }
+
+ # Mocking socket.create_connection will cause _sock to always be a
+ # MagicMock()
+ patcher = mock.patch('socket.create_connection', spec=True)
+ self.MockCreateConn = patcher.start()
+ self.addCleanup(patcher.stop)
+
+ # Also mock socket.sendall() to appear successful
+ self.MockCreateConn().sendall.return_value = None
+
+ # And mock socket.recv() to return two payloads, then '', then raise
+ # Note that this currently ignores the num_bytes parameter to sock.recv()
+ payload_size = len(self.config['payload'])
+ payload2_size = len(self.config['payload2'])
+ self.MockCreateConn().recv.side_effect = [
+ struct.pack('>i', payload_size),
+ struct.pack('>%ds' % payload_size, self.config['payload']),
+ struct.pack('>i', payload2_size),
+ struct.pack('>%ds' % payload2_size, self.config['payload2']),
+ b''
+ ]
+
+ # Create a connection object
+ self.conn = KafkaConnection(self.config['host'], self.config['port'])
+
+ # Reset any mock counts caused by __init__
+ self.MockCreateConn.reset_mock()
+
+ def test_collect_hosts__happy_path(self):
+ hosts = "localhost:1234,localhost"
+ results = collect_hosts(hosts)
+
+ self.assertEqual(set(results), set([
+ ('localhost', 1234),
+ ('localhost', 9092),
+ ]))
+
+ def test_collect_hosts__string_list(self):
+ hosts = [
+ 'localhost:1234',
+ 'localhost',
+ ]
+
+ results = collect_hosts(hosts)
+
+ self.assertEqual(set(results), set([
+ ('localhost', 1234),
+ ('localhost', 9092),
+ ]))
+
+ def test_collect_hosts__with_spaces(self):
+ hosts = "localhost:1234, localhost"
+ results = collect_hosts(hosts)
+
+ self.assertEqual(set(results), set([
+ ('localhost', 1234),
+ ('localhost', 9092),
+ ]))
+
+ def test_send(self):
+ self.conn.send(self.config['request_id'], self.config['payload'])
+ self.conn._sock.sendall.assert_called_with(self.config['payload'])
+
+ def test_init_creates_socket_connection(self):
+ KafkaConnection(self.config['host'], self.config['port'])
+ self.MockCreateConn.assert_called_with((self.config['host'], self.config['port']), DEFAULT_SOCKET_TIMEOUT_SECONDS)
+
+ def test_init_failure_raises_connection_error(self):
+
+ def raise_error(*args):
+ raise socket.error
+
+ assert socket.create_connection is self.MockCreateConn
+ socket.create_connection.side_effect=raise_error
+ with self.assertRaises(ConnectionError):
+ KafkaConnection(self.config['host'], self.config['port'])
+
+ def test_send__reconnects_on_dirty_conn(self):
+
+ # Dirty the connection
+ try:
+ self.conn._raise_connection_error()
+ except ConnectionError:
+ pass
+
+ # Now test that sending attempts to reconnect
+ self.assertEqual(self.MockCreateConn.call_count, 0)
+ self.conn.send(self.config['request_id'], self.config['payload'])
+ self.assertEqual(self.MockCreateConn.call_count, 1)
+
+ def test_send__failure_sets_dirty_connection(self):
+
+ def raise_error(*args):
+ raise socket.error
+
+ assert isinstance(self.conn._sock, mock.Mock)
+ self.conn._sock.sendall.side_effect=raise_error
+ try:
+ self.conn.send(self.config['request_id'], self.config['payload'])
+ except ConnectionError:
+ self.assertIsNone(self.conn._sock)
+
+ def test_recv(self):
+
+ self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload'])
+
+ def test_recv__reconnects_on_dirty_conn(self):
+
+ # Dirty the connection
+ try:
+ self.conn._raise_connection_error()
+ except ConnectionError:
+ pass
+
+ # Now test that recv'ing attempts to reconnect
+ self.assertEqual(self.MockCreateConn.call_count, 0)
+ self.conn.recv(self.config['request_id'])
+ self.assertEqual(self.MockCreateConn.call_count, 1)
+
+ def test_recv__failure_sets_dirty_connection(self):
+
+ def raise_error(*args):
+ raise socket.error
+
+ # test that recv'ing attempts to reconnect
+ assert isinstance(self.conn._sock, mock.Mock)
+ self.conn._sock.recv.side_effect=raise_error
+ try:
+ self.conn.recv(self.config['request_id'])
+ except ConnectionError:
+ self.assertIsNone(self.conn._sock)
+
+ def test_recv__doesnt_consume_extra_data_in_stream(self):
+
+ # Here just test that each call to recv will return a single payload
+ self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload'])
+ self.assertEqual(self.conn.recv(self.config['request_id']), self.config['payload2'])
+
+ def test_close__object_is_reusable(self):
+
+ # test that sending to a closed connection
+ # will re-connect and send data to the socket
+ self.conn.close()
+ self.conn.send(self.config['request_id'], self.config['payload'])
+ self.assertEqual(self.MockCreateConn.call_count, 1)
+ self.conn._sock.sendall.assert_called_with(self.config['payload'])
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/db2bbf91/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_consumer.py
----------------------------------------------------------------------
diff --git a/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_consumer.py b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_consumer.py
new file mode 100644
index 0000000..7b8f370
--- /dev/null
+++ b/eagle-external/hadoop_jmx_collector/lib/kafka-python/test/test_consumer.py
@@ -0,0 +1,15 @@
+
+from mock import MagicMock
+from . import unittest
+
+from kafka import SimpleConsumer, KafkaConsumer
+from kafka.common import KafkaConfigurationError
+
+class TestKafkaConsumer(unittest.TestCase):
+ def test_non_integer_partitions(self):
+ with self.assertRaises(AssertionError):
+ SimpleConsumer(MagicMock(), 'group', 'topic', partitions = [ '0' ])
+
+ def test_broker_list_required(self):
+ with self.assertRaises(KafkaConfigurationError):
+ KafkaConsumer()