You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sr...@apache.org on 2014/02/25 09:27:27 UTC

[17/19] git commit: Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into trunk

Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into trunk

Conflicts:
	core/src/main/scala/kafka/controller/KafkaController.scala


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5de68ef4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5de68ef4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5de68ef4

Branch: refs/heads/trunk
Commit: 5de68ef4aef7812fd9f2d5e4fb6158bf753658e3
Parents: 2d848c5 993e1aa
Author: Sriram Subramanian <sr...@gmail.com>
Authored: Mon Feb 24 01:50:02 2014 -0800
Committer: Sriram Subramanian <sr...@gmail.com>
Committed: Mon Feb 24 01:50:02 2014 -0800

----------------------------------------------------------------------
 HEADER                                          |   14 +
 LICENSE                                         |   31 -
 README.md                                       |  103 +-
 bin/kafka-run-class.sh                          |   28 +-
 bin/run-rat.sh                                  |   35 -
 build.gradle                                    |  331 +++
 clients/build.sbt                               |   11 -
 .../producer/BufferExhaustedException.java      |   17 -
 .../java/kafka/clients/producer/Callback.java   |   18 -
 .../kafka/clients/producer/KafkaProducer.java   |  286 ---
 .../kafka/clients/producer/MockProducer.java    |  199 --
 .../java/kafka/clients/producer/Producer.java   |   48 -
 .../kafka/clients/producer/ProducerConfig.java  |  130 --
 .../kafka/clients/producer/ProducerRecord.java  |   84 -
 .../kafka/clients/producer/RecordMetadata.java  |   39 -
 .../clients/producer/internals/BufferPool.java  |  223 --
 .../internals/FutureRecordMetadata.java         |   63 -
 .../clients/producer/internals/Metadata.java    |  120 -
 .../clients/producer/internals/Partitioner.java |   55 -
 .../internals/ProduceRequestResult.java         |   81 -
 .../producer/internals/RecordAccumulator.java   |  234 --
 .../clients/producer/internals/RecordBatch.java |   86 -
 .../clients/producer/internals/Sender.java      |  504 ----
 .../clients/tools/ProducerPerformance.java      |   65 -
 clients/src/main/java/kafka/common/Cluster.java |  123 -
 .../main/java/kafka/common/Configurable.java    |   15 -
 .../main/java/kafka/common/KafkaException.java  |   26 -
 clients/src/main/java/kafka/common/Metric.java  |   23 -
 clients/src/main/java/kafka/common/Node.java    |   76 -
 .../main/java/kafka/common/PartitionInfo.java   |   58 -
 .../main/java/kafka/common/TopicPartition.java  |   61 -
 .../kafka/common/config/AbstractConfig.java     |   93 -
 .../java/kafka/common/config/ConfigDef.java     |  253 --
 .../kafka/common/config/ConfigException.java    |   24 -
 .../java/kafka/common/errors/ApiException.java  |   35 -
 .../common/errors/CorruptRecordException.java   |   23 -
 .../errors/LeaderNotAvailableException.java     |   19 -
 .../kafka/common/errors/NetworkException.java   |   23 -
 .../errors/NotLeaderForPartitionException.java  |   23 -
 .../common/errors/OffsetMetadataTooLarge.java   |   22 -
 .../errors/OffsetOutOfRangeException.java       |   22 -
 .../common/errors/RecordTooLargeException.java  |   23 -
 .../kafka/common/errors/RetryableException.java |   31 -
 .../kafka/common/errors/TimeoutException.java   |   23 -
 .../common/errors/UnknownServerException.java   |   22 -
 .../UnknownTopicOrPartitionException.java       |   22 -
 .../java/kafka/common/metrics/CompoundStat.java |   40 -
 .../java/kafka/common/metrics/JmxReporter.java  |  184 --
 .../java/kafka/common/metrics/KafkaMetric.java  |   55 -
 .../java/kafka/common/metrics/Measurable.java   |   16 -
 .../kafka/common/metrics/MeasurableStat.java    |   10 -
 .../java/kafka/common/metrics/MetricConfig.java |   71 -
 .../main/java/kafka/common/metrics/Metrics.java |  190 --
 .../kafka/common/metrics/MetricsReporter.java   |   27 -
 .../main/java/kafka/common/metrics/Quota.java   |   36 -
 .../common/metrics/QuotaViolationException.java |   16 -
 .../main/java/kafka/common/metrics/Sensor.java  |  171 --
 .../main/java/kafka/common/metrics/Stat.java    |   16 -
 .../java/kafka/common/metrics/stats/Avg.java    |   33 -
 .../java/kafka/common/metrics/stats/Count.java  |   29 -
 .../kafka/common/metrics/stats/Histogram.java   |  137 --
 .../java/kafka/common/metrics/stats/Max.java    |   29 -
 .../java/kafka/common/metrics/stats/Min.java    |   29 -
 .../kafka/common/metrics/stats/Percentile.java  |   32 -
 .../kafka/common/metrics/stats/Percentiles.java |   76 -
 .../java/kafka/common/metrics/stats/Rate.java   |   85 -
 .../kafka/common/metrics/stats/SampledStat.java |  106 -
 .../java/kafka/common/metrics/stats/Total.java  |   31 -
 .../kafka/common/network/ByteBufferReceive.java |   45 -
 .../kafka/common/network/ByteBufferSend.java    |   54 -
 .../kafka/common/network/NetworkReceive.java    |   74 -
 .../java/kafka/common/network/NetworkSend.java  |   26 -
 .../main/java/kafka/common/network/Receive.java |   35 -
 .../java/kafka/common/network/Selectable.java   |   68 -
 .../java/kafka/common/network/Selector.java     |  349 ---
 .../main/java/kafka/common/network/Send.java    |   41 -
 .../java/kafka/common/protocol/ApiKeys.java     |   35 -
 .../main/java/kafka/common/protocol/Errors.java |   97 -
 .../java/kafka/common/protocol/ProtoUtils.java  |   97 -
 .../java/kafka/common/protocol/Protocol.java    |  130 --
 .../kafka/common/protocol/types/ArrayOf.java    |   63 -
 .../java/kafka/common/protocol/types/Field.java |   48 -
 .../kafka/common/protocol/types/Schema.java     |  134 --
 .../common/protocol/types/SchemaException.java  |   13 -
 .../kafka/common/protocol/types/Struct.java     |  227 --
 .../java/kafka/common/protocol/types/Type.java  |  216 --
 .../kafka/common/record/CompressionType.java    |   40 -
 .../common/record/InvalidRecordException.java   |   11 -
 .../main/java/kafka/common/record/LogEntry.java |   28 -
 .../java/kafka/common/record/MemoryRecords.java |  102 -
 .../main/java/kafka/common/record/Record.java   |  286 ---
 .../main/java/kafka/common/record/Records.java  |   29 -
 .../kafka/common/requests/RequestHeader.java    |   68 -
 .../java/kafka/common/requests/RequestSend.java |   38 -
 .../kafka/common/requests/ResponseHeader.java   |   45 -
 .../kafka/common/utils/AbstractIterator.java    |   72 -
 .../java/kafka/common/utils/CopyOnWriteMap.java |  130 --
 .../src/main/java/kafka/common/utils/Crc32.java | 2169 ------------------
 .../java/kafka/common/utils/KafkaThread.java    |   18 -
 .../java/kafka/common/utils/SystemTime.java     |   26 -
 .../src/main/java/kafka/common/utils/Time.java  |   23 -
 .../src/main/java/kafka/common/utils/Utils.java |  230 --
 .../producer/BufferExhaustedException.java      |   33 +
 .../apache/kafka/clients/producer/Callback.java |   34 +
 .../kafka/clients/producer/KafkaProducer.java   |  303 +++
 .../kafka/clients/producer/MockProducer.java    |  216 ++
 .../apache/kafka/clients/producer/Producer.java |   65 +
 .../kafka/clients/producer/ProducerConfig.java  |  170 ++
 .../kafka/clients/producer/ProducerRecord.java  |  100 +
 .../kafka/clients/producer/RecordMetadata.java  |   55 +
 .../clients/producer/internals/BufferPool.java  |  240 ++
 .../internals/FutureRecordMetadata.java         |   80 +
 .../clients/producer/internals/Metadata.java    |  139 ++
 .../clients/producer/internals/Partitioner.java |   72 +
 .../internals/ProduceRequestResult.java         |   98 +
 .../producer/internals/RecordAccumulator.java   |  250 ++
 .../clients/producer/internals/RecordBatch.java |   99 +
 .../clients/producer/internals/Sender.java      |  691 ++++++
 .../clients/tools/ProducerPerformance.java      |   85 +
 .../java/org/apache/kafka/common/Cluster.java   |  120 +
 .../org/apache/kafka/common/Configurable.java   |   31 +
 .../org/apache/kafka/common/KafkaException.java |   42 +
 .../java/org/apache/kafka/common/Metric.java    |   39 +
 .../main/java/org/apache/kafka/common/Node.java |   92 +
 .../org/apache/kafka/common/PartitionInfo.java  |   74 +
 .../org/apache/kafka/common/TopicPartition.java |   77 +
 .../kafka/common/config/AbstractConfig.java     |  110 +
 .../apache/kafka/common/config/ConfigDef.java   |  269 +++
 .../kafka/common/config/ConfigException.java    |   40 +
 .../kafka/common/errors/ApiException.java       |   51 +
 .../common/errors/CorruptRecordException.java   |   39 +
 .../common/errors/InvalidMetadataException.java |   39 +
 .../errors/LeaderNotAvailableException.java     |   27 +
 .../kafka/common/errors/NetworkException.java   |   39 +
 .../errors/NotLeaderForPartitionException.java  |   38 +
 .../common/errors/OffsetMetadataTooLarge.java   |   37 +
 .../errors/OffsetOutOfRangeException.java       |   38 +
 .../common/errors/RecordTooLargeException.java  |   38 +
 .../kafka/common/errors/RetriableException.java |   37 +
 .../kafka/common/errors/TimeoutException.java   |   38 +
 .../common/errors/UnknownServerException.java   |   39 +
 .../UnknownTopicOrPartitionException.java       |   37 +
 .../kafka/common/metrics/CompoundStat.java      |   56 +
 .../kafka/common/metrics/JmxReporter.java       |  201 ++
 .../kafka/common/metrics/KafkaMetric.java       |   71 +
 .../apache/kafka/common/metrics/Measurable.java |   32 +
 .../kafka/common/metrics/MeasurableStat.java    |   26 +
 .../kafka/common/metrics/MetricConfig.java      |   87 +
 .../apache/kafka/common/metrics/Metrics.java    |  207 ++
 .../kafka/common/metrics/MetricsReporter.java   |   43 +
 .../org/apache/kafka/common/metrics/Quota.java  |   52 +
 .../common/metrics/QuotaViolationException.java |   32 +
 .../org/apache/kafka/common/metrics/Sensor.java |  188 ++
 .../org/apache/kafka/common/metrics/Stat.java   |   32 +
 .../apache/kafka/common/metrics/stats/Avg.java  |   50 +
 .../kafka/common/metrics/stats/Count.java       |   46 +
 .../kafka/common/metrics/stats/Histogram.java   |  157 ++
 .../apache/kafka/common/metrics/stats/Max.java  |   46 +
 .../apache/kafka/common/metrics/stats/Min.java  |   46 +
 .../kafka/common/metrics/stats/Percentile.java  |   48 +
 .../kafka/common/metrics/stats/Percentiles.java |  121 +
 .../apache/kafka/common/metrics/stats/Rate.java |  106 +
 .../kafka/common/metrics/stats/SampledStat.java |  127 +
 .../kafka/common/metrics/stats/Total.java       |   47 +
 .../kafka/common/network/ByteBufferReceive.java |   61 +
 .../kafka/common/network/ByteBufferSend.java    |   70 +
 .../kafka/common/network/NetworkReceive.java    |   90 +
 .../kafka/common/network/NetworkSend.java       |   42 +
 .../apache/kafka/common/network/Receive.java    |   51 +
 .../apache/kafka/common/network/Selectable.java |   84 +
 .../apache/kafka/common/network/Selector.java   |  364 +++
 .../org/apache/kafka/common/network/Send.java   |   57 +
 .../apache/kafka/common/protocol/ApiKeys.java   |   51 +
 .../apache/kafka/common/protocol/Errors.java    |  114 +
 .../kafka/common/protocol/ProtoUtils.java       |   65 +
 .../apache/kafka/common/protocol/Protocol.java  |  147 ++
 .../kafka/common/protocol/types/ArrayOf.java    |   79 +
 .../kafka/common/protocol/types/Field.java      |   64 +
 .../kafka/common/protocol/types/Schema.java     |  156 ++
 .../common/protocol/types/SchemaException.java  |   29 +
 .../kafka/common/protocol/types/Struct.java     |  243 ++
 .../kafka/common/protocol/types/Type.java       |  232 ++
 .../kafka/common/record/CompressionType.java    |   56 +
 .../common/record/InvalidRecordException.java   |   27 +
 .../apache/kafka/common/record/LogEntry.java    |   44 +
 .../kafka/common/record/MemoryRecords.java      |  119 +
 .../org/apache/kafka/common/record/Record.java  |  303 +++
 .../org/apache/kafka/common/record/Records.java |   45 +
 .../kafka/common/requests/MetadataRequest.java  |   25 +
 .../kafka/common/requests/MetadataResponse.java |   77 +
 .../kafka/common/requests/RequestHeader.java    |   85 +
 .../kafka/common/requests/RequestSend.java      |   55 +
 .../kafka/common/requests/ResponseHeader.java   |   62 +
 .../kafka/common/utils/AbstractIterator.java    |   88 +
 .../kafka/common/utils/CopyOnWriteMap.java      |  146 ++
 .../org/apache/kafka/common/utils/Crc32.java    | 2169 ++++++++++++++++++
 .../apache/kafka/common/utils/KafkaThread.java  |   34 +
 .../apache/kafka/common/utils/SystemTime.java   |   42 +
 .../org/apache/kafka/common/utils/Time.java     |   39 +
 .../org/apache/kafka/common/utils/Utils.java    |  247 ++
 .../clients/common/network/SelectorTest.java    |  292 ---
 .../kafka/clients/producer/BufferPoolTest.java  |  170 --
 .../kafka/clients/producer/MetadataTest.java    |   49 -
 .../clients/producer/MockProducerTest.java      |   63 -
 .../kafka/clients/producer/PartitionerTest.java |   54 -
 .../clients/producer/RecordAccumulatorTest.java |  135 --
 .../kafka/clients/producer/RecordSendTest.java  |   78 -
 .../java/kafka/clients/producer/SenderTest.java |   87 -
 .../java/kafka/common/config/ConfigDefTest.java |   88 -
 .../kafka/common/metrics/JmxReporterTest.java   |   21 -
 .../java/kafka/common/metrics/MetricsTest.java  |  176 --
 .../common/metrics/stats/HistogramTest.java     |   56 -
 .../types/ProtocolSerializationTest.java        |   96 -
 .../kafka/common/record/MemoryRecordsTest.java  |   44 -
 .../java/kafka/common/record/RecordTest.java    |   87 -
 .../common/utils/AbstractIteratorTest.java      |   54 -
 .../test/java/kafka/common/utils/MockTime.java  |   28 -
 .../src/test/java/kafka/test/MetricsBench.java  |   38 -
 .../test/java/kafka/test/Microbenchmarks.java   |  143 --
 .../src/test/java/kafka/test/MockSelector.java  |   87 -
 clients/src/test/java/kafka/test/TestUtils.java |   95 -
 .../kafka/clients/producer/BufferPoolTest.java  |  187 ++
 .../kafka/clients/producer/MetadataTest.java    |   65 +
 .../clients/producer/MockProducerTest.java      |   82 +
 .../kafka/clients/producer/PartitionerTest.java |   81 +
 .../clients/producer/RecordAccumulatorTest.java |  146 ++
 .../kafka/clients/producer/RecordSendTest.java  |   94 +
 .../kafka/clients/producer/SenderTest.java      |  207 ++
 .../kafka/common/config/ConfigDefTest.java      |  106 +
 .../kafka/common/metrics/JmxReporterTest.java   |   40 +
 .../kafka/common/metrics/MetricsTest.java       |  234 ++
 .../common/metrics/stats/HistogramTest.java     |  103 +
 .../kafka/common/network/SelectorTest.java      |  308 +++
 .../types/ProtocolSerializationTest.java        |  118 +
 .../kafka/common/record/MemoryRecordsTest.java  |   63 +
 .../apache/kafka/common/record/RecordTest.java  |  106 +
 .../common/utils/AbstractIteratorTest.java      |   71 +
 .../org/apache/kafka/common/utils/MockTime.java |   46 +
 .../org/apache/kafka/test/MetricsBench.java     |   55 +
 .../org/apache/kafka/test/Microbenchmarks.java  |  160 ++
 .../org/apache/kafka/test/MockSelector.java     |  104 +
 .../java/org/apache/kafka/test/TestUtils.java   |  112 +
 contrib/hadoop-consumer/build.sbt               |    1 -
 contrib/hadoop-producer/build.sbt               |    1 -
 core/build.sbt                                  |   32 -
 .../src/main/scala/kafka/admin/AdminUtils.scala |    3 +-
 .../main/scala/kafka/admin/TopicCommand.scala   |  135 +-
 .../kafka/api/ControlledShutdownResponse.scala  |    4 +-
 .../scala/kafka/api/LeaderAndIsrRequest.scala   |    1 +
 .../scala/kafka/api/StopReplicaRequest.scala    |   22 +-
 .../scala/kafka/api/StopReplicaResponse.scala   |   18 +-
 .../scala/kafka/api/UpdateMetadataRequest.scala |    1 +
 .../main/scala/kafka/cluster/Partition.scala    |   36 +-
 .../scala/kafka/consumer/SimpleConsumer.scala   |    8 +-
 core/src/main/scala/kafka/consumer/package.html |   18 +
 .../controller/ControllerChannelManager.scala   |  160 +-
 .../kafka/controller/KafkaController.scala      |  328 ++-
 .../controller/PartitionLeaderSelector.scala    |   16 +-
 .../controller/PartitionStateMachine.scala      |  139 +-
 .../kafka/controller/ReplicaStateMachine.scala  |  194 +-
 .../kafka/controller/TopicDeletionManager.scala |  376 +++
 core/src/main/scala/kafka/log/package.html      |   18 +
 core/src/main/scala/kafka/message/package.html  |   18 +
 .../scala/kafka/network/BlockingChannel.scala   |   16 +-
 core/src/main/scala/kafka/network/package.html  |   18 +
 .../scala/kafka/producer/SyncProducer.scala     |    6 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |   57 +-
 .../main/scala/kafka/server/KafkaConfig.scala   |    2 +-
 .../scala/kafka/server/KafkaHealthcheck.scala   |    9 +-
 .../main/scala/kafka/server/KafkaServer.scala   |    6 +-
 .../scala/kafka/server/OffsetCheckpoint.scala   |    1 -
 .../scala/kafka/server/ReplicaManager.scala     |  146 +-
 .../scala/kafka/server/TopicConfigManager.scala |    9 +-
 .../kafka/server/ZookeeperLeaderElector.scala   |   10 +-
 core/src/main/scala/kafka/server/package.html   |   18 +
 .../kafka/tools/newproducer/MirrorMaker.scala   |  184 ++
 .../scala/kafka/utils/CommandLineUtils.scala    |   24 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala   |   20 +-
 core/src/main/scala/kafka/utils/package.html    |    1 -
 .../kafka/api/ProducerSendTest.scala            |  275 +++
 .../scala/other/kafka/TestCrcPerformance.scala  |   16 +
 .../scala/other/kafka/TestEndToEndLatency.scala |   22 +-
 .../test/scala/unit/kafka/admin/AdminTest.scala |   52 +-
 .../unit/kafka/admin/DeleteTopicTest.scala      |  452 ++++
 .../api/RequestResponseSerializationTest.scala  |    6 +-
 .../kafka/integration/PrimitiveApiTest.scala    |  140 +-
 .../scala/unit/kafka/server/LogOffsetTest.scala |    4 +-
 .../unit/kafka/server/LogRecoveryTest.scala     |    2 +-
 .../unit/kafka/server/OffsetCommitTest.scala    |   32 +-
 .../unit/kafka/server/SimpleFetchTest.scala     |   11 +-
 .../unit/kafka/utils/IteratorTemplateTest.scala |   16 +
 .../test/scala/unit/kafka/utils/JsonTest.scala  |   16 +
 .../test/scala/unit/kafka/utils/TestUtils.scala |   42 +-
 .../unit/kafka/zk/ZooKeeperTestHarness.scala    |    4 +-
 examples/build.sbt                              |    3 -
 gradle.properties                               |   23 +
 gradle/buildscript.gradle                       |   12 +
 gradle/license.gradle                           |    9 +
 gradle/wrapper/gradle-wrapper.jar               |  Bin 0 -> 49875 bytes
 gradle/wrapper/gradle-wrapper.properties        |    6 +
 gradlew                                         |  164 ++
 lib/sbt-launch.jar                              |  Bin 1103618 -> 0 bytes
 perf/build.sbt                                  |    1 -
 .../scala/kafka/perf/ConsumerPerformance.scala  |  105 +-
 .../scala/kafka/perf/ProducerPerformance.scala  |  216 +-
 project/Build.scala                             |  152 --
 project/build.properties                        |   17 -
 project/build/KafkaProject.scala                |  251 --
 project/plugins.sbt                             |    9 -
 sbt                                             |   16 -
 sbt.bat                                         |   17 -
 scala.gradle                                    |    5 +
 settings.gradle                                 |   17 +
 313 files changed, 17403 insertions(+), 13130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5de68ef4/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --cc core/src/main/scala/kafka/controller/KafkaController.scala
index f4f00b2,00a1f98..f12ffc2
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@@ -554,7 -612,8 +612,8 @@@ class KafkaController(val config : Kafk
      } catch {
        case e: Throwable => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e)
      } finally {
 -      removePartitionsFromPreferredReplicaElection(partitions)
 +      removePartitionsFromPreferredReplicaElection(partitions, isTriggeredByAutoRebalance)
+       deleteTopicManager.resumeDeletionForTopics(partitions.map(_.topic))
      }
    }
  
@@@ -1008,6 -1090,6 +1092,7 @@@
              topicsNotInPreferredReplica =
                topicAndPartitionsForBroker.filter {
                  case(topicPartition, replicas) => {
++                  controllerContext.partitionLeadershipInfo.contains(topicPartition) &&
                    controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker
                  }
                }
@@@ -1020,16 -1102,26 +1105,19 @@@
            // check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions
            // that need to be on this broker
            if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
 -            inLock(controllerContext.controllerLock) {
 -              // do this check only if the broker is live and there are no partitions being reassigned currently
 -              // and preferred replica election is not in progress
 -              if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
 -                  controllerContext.partitionsBeingReassigned.size == 0 &&
 -                  controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0) {
 -                val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath
 -                val partitionsList = topicsNotInPreferredReplica.keys.map(e => Map("topic" -> e.topic, "partition" -> e.partition))
 -                val jsonData = Json.encode(Map("version" -> 1, "partitions" -> partitionsList))
 -                try {
 -                  ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
 -                  info("Created preferred replica election path with %s".format(jsonData))
 -                } catch {
 -                  case e2: ZkNodeExistsException =>
 -                    val partitionsUndergoingPreferredReplicaElection =
 -                      PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(ZkUtils.readData(zkClient, zkPath)._1)
 -                    error("Preferred replica leader election currently in progress for " +
 -                          "%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection));
 -                  case e3: Throwable =>
 -                    error("Error while trying to auto rebalance topics %s".format(topicsNotInPreferredReplica.keys))
 +            topicsNotInPreferredReplica.foreach {
 +              case(topicPartition, replicas) => {
-                 controllerContext.controllerLock synchronized {
++                inLock(controllerContext.controllerLock) {
 +                  // do this check only if the broker is live and there are no partitions being reassigned currently
 +                  // and preferred replica election is not in progress
 +                  if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
-                     controllerContext.partitionsBeingReassigned.size == 0 &&
-                     controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0) {
++                      controllerContext.partitionsBeingReassigned.size == 0 &&
++                      controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0 &&
++                      !deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
++                      !deleteTopicManager.isTopicDeletionInProgress(topicPartition.topic) &&
++                      controllerContext.allTopics.contains(topicPartition.topic)) {
 +                    onPreferredReplicaElection(Set(topicPartition), false)
 +                  }
                  }
                }
              }