You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/12/16 03:30:41 UTC

[GitHub] [pulsar] codelipenghui commented on a diff in pull request #18672: [fix][broker] Fix PulsarRegistrationClient and ZkRegistrationClient not aware rack info problem.

codelipenghui commented on code in PR #18672:
URL: https://github.com/apache/pulsar/pull/18672#discussion_r1050334648


##########
pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java:
##########
@@ -198,4 +222,206 @@ public void testBookieInfoChange() throws Exception {
             assertNull(r.get(2));
         });
     }
+
+    @Test
+    public void testWithPulsarRegistrationClient() throws Exception {
+        String data = "{\"group1\": {\"" + BOOKIE1
+                + "\": {\"rack\": \"/rack0\", \"hostname\": \"bookie1.example.com\"}, \"" + BOOKIE2
+                + "\": {\"rack\": \"/rack1\", \"hostname\": \"bookie2.example.com\"}}}";
+        store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, data.getBytes(), Optional.empty()).join();
+
+        // Case1: ZKCache is given
+        BookieRackAffinityMapping mapping = new BookieRackAffinityMapping();
+        Field field = BookieRackAffinityMapping.class.getDeclaredField("racksWithHost");
+        field.setAccessible(true);
+
+        ClientConfiguration bkClientConf = new ClientConfiguration();
+        bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE, store);
+
+        PulsarRegistrationClient pulsarRegistrationClient = new PulsarRegistrationClient(store, "/ledgers");
+        DefaultBookieAddressResolver defaultBookieAddressResolver = new DefaultBookieAddressResolver(pulsarRegistrationClient);
+
+        mapping.setBookieAddressResolver(defaultBookieAddressResolver);
+        mapping.setConf(bkClientConf);
+        List<String> racks = mapping
+                .resolve(Lists.newArrayList(BOOKIE1.getHostName(), BOOKIE2.getHostName(), BOOKIE3.getHostName()))
+                .stream().filter(Objects::nonNull).toList();
+        assertEquals(racks.size(), 0);
+
+        HashedWheelTimer timer = new HashedWheelTimer(

Review Comment:
   Stop the timer after the test.



##########
pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java:
##########
@@ -198,4 +222,206 @@ public void testBookieInfoChange() throws Exception {
             assertNull(r.get(2));
         });
     }
+
+    @Test
+    public void testWithPulsarRegistrationClient() throws Exception {
+        String data = "{\"group1\": {\"" + BOOKIE1
+                + "\": {\"rack\": \"/rack0\", \"hostname\": \"bookie1.example.com\"}, \"" + BOOKIE2
+                + "\": {\"rack\": \"/rack1\", \"hostname\": \"bookie2.example.com\"}}}";
+        store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, data.getBytes(), Optional.empty()).join();
+
+        // Case1: ZKCache is given
+        BookieRackAffinityMapping mapping = new BookieRackAffinityMapping();
+        Field field = BookieRackAffinityMapping.class.getDeclaredField("racksWithHost");
+        field.setAccessible(true);
+
+        ClientConfiguration bkClientConf = new ClientConfiguration();
+        bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE, store);
+
+        PulsarRegistrationClient pulsarRegistrationClient = new PulsarRegistrationClient(store, "/ledgers");
+        DefaultBookieAddressResolver defaultBookieAddressResolver = new DefaultBookieAddressResolver(pulsarRegistrationClient);
+
+        mapping.setBookieAddressResolver(defaultBookieAddressResolver);
+        mapping.setConf(bkClientConf);
+        List<String> racks = mapping
+                .resolve(Lists.newArrayList(BOOKIE1.getHostName(), BOOKIE2.getHostName(), BOOKIE3.getHostName()))
+                .stream().filter(Objects::nonNull).toList();
+        assertEquals(racks.size(), 0);
+
+        HashedWheelTimer timer = new HashedWheelTimer(
+                new ThreadFactoryBuilder().setNameFormat("TestTimer-%d").build(),
+                bkClientConf.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS,
+                bkClientConf.getTimeoutTimerNumTicks());
+
+        RackawareEnsemblePlacementPolicy repp = new RackawareEnsemblePlacementPolicy();
+        Class<?> clazz1 = Class.forName("org.apache.bookkeeper.client.TopologyAwareEnsemblePlacementPolicy");
+        Field field1 = clazz1.getDeclaredField("knownBookies");
+        field1.setAccessible(true);
+        Map<BookieId, BookieNode> knownBookies = (Map<BookieId, BookieNode>) field1.get(repp);
+        repp.initialize(bkClientConf, Optional.of(mapping), timer,
+                DISABLE_ALL, NullStatsLogger.INSTANCE, defaultBookieAddressResolver);
+
+        Class<?> clazz2 = Class.forName("org.apache.bookkeeper.client.BookieWatcherImpl");
+        Constructor<?> constructor =
+                clazz2.getDeclaredConstructor(ClientConfiguration.class, EnsemblePlacementPolicy.class,
+                        RegistrationClient.class, BookieAddressResolver.class, StatsLogger.class);
+        constructor.setAccessible(true);
+        Object o = constructor.newInstance(bkClientConf, repp, pulsarRegistrationClient, defaultBookieAddressResolver,
+                NullStatsLogger.INSTANCE);
+        Method method = clazz2.getDeclaredMethod("initialBlockingBookieRead");
+        method.setAccessible(true);
+        method.invoke(o);
+
+        Set<BookieId> bookieIds = new HashSet<>();
+        bookieIds.add(BOOKIE1.toBookieId());
+
+        Field field2 = BookieServiceInfoSerde.class.getDeclaredField("INSTANCE");
+        field2.setAccessible(true);
+        BookieServiceInfoSerde serviceInfoSerde = (BookieServiceInfoSerde) field2.get(null);
+
+        BookieServiceInfo bookieServiceInfo = BookieServiceInfoUtils.buildLegacyBookieServiceInfo(BOOKIE1.toString());
+        store.put("/ledgers/available/" + BOOKIE1, serviceInfoSerde.serialize("", bookieServiceInfo),
+                Optional.of(-1L)).get();
+
+        Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> ((BookiesRackConfiguration)field.get(mapping)).get("group1").size() == 1);
+        racks = mapping
+                .resolve(Lists.newArrayList(BOOKIE1.getHostName(), BOOKIE2.getHostName(), BOOKIE3.getHostName()))
+                .stream().filter(Objects::nonNull).toList();
+        assertEquals(racks.size(), 1);
+        assertEquals(racks.get(0), "/rack0");
+        assertEquals(knownBookies.size(), 1);
+        assertEquals(knownBookies.get(BOOKIE1.toBookieId()).getNetworkLocation(), "/rack0");
+
+        bookieServiceInfo = BookieServiceInfoUtils.buildLegacyBookieServiceInfo(BOOKIE2.toString());
+        store.put("/ledgers/available/" + BOOKIE2, serviceInfoSerde.serialize("", bookieServiceInfo),
+                Optional.of(-1L)).get();
+        Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> ((BookiesRackConfiguration)field.get(mapping)).get("group1").size() == 2);
+
+        racks = mapping
+                .resolve(Lists.newArrayList(BOOKIE1.getHostName(), BOOKIE2.getHostName(), BOOKIE3.getHostName()))
+                .stream().filter(Objects::nonNull).toList();
+        assertEquals(racks.size(), 2);
+        assertEquals(racks.get(0), "/rack0");
+        assertEquals(racks.get(1), "/rack1");
+        assertEquals(knownBookies.size(), 2);
+        assertEquals(knownBookies.get(BOOKIE1.toBookieId()).getNetworkLocation(), "/rack0");
+        assertEquals(knownBookies.get(BOOKIE2.toBookieId()).getNetworkLocation(), "/rack1");
+
+        bookieServiceInfo = BookieServiceInfoUtils.buildLegacyBookieServiceInfo(BOOKIE3.toString());
+        store.put("/ledgers/available/" + BOOKIE3, serviceInfoSerde.serialize("", bookieServiceInfo),
+                Optional.of(-1L)).get();
+        Awaitility.await().atMost(30, TimeUnit.SECONDS).until(() -> ((BookiesRackConfiguration)field.get(mapping)).get("group1").size() == 2);
+
+        racks = mapping
+                .resolve(Lists.newArrayList(BOOKIE1.getHostName(), BOOKIE2.getHostName(), BOOKIE3.getHostName()))
+                .stream().filter(Objects::nonNull).toList();
+        assertEquals(racks.size(), 2);
+        assertEquals(racks.get(0), "/rack0");
+        assertEquals(racks.get(1), "/rack1");
+        assertEquals(knownBookies.size(), 3);
+        assertEquals(knownBookies.get(BOOKIE1.toBookieId()).getNetworkLocation(), "/rack0");
+        assertEquals(knownBookies.get(BOOKIE2.toBookieId()).getNetworkLocation(), "/rack1");
+        assertEquals(knownBookies.get(BOOKIE3.toBookieId()).getNetworkLocation(), "/default-rack");
+    }
+
+    @Test
+    public void testWithZkRegistrationClient() throws Exception {
+        String data = "{\"group1\": {\"" + BOOKIE1
+                + "\": {\"rack\": \"/rack0\", \"hostname\": \"bookie1.example.com\"}, \"" + BOOKIE2
+                + "\": {\"rack\": \"/rack1\", \"hostname\": \"bookie2.example.com\"}}}";
+        store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, data.getBytes(), Optional.empty()).join();
+
+        // Case1: ZKCache is given
+        BookieRackAffinityMapping mapping = new BookieRackAffinityMapping();
+        Field field = BookieRackAffinityMapping.class.getDeclaredField("racksWithHost");
+        field.setAccessible(true);
+
+        ClientConfiguration bkClientConf = new ClientConfiguration();
+        bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE, store);
+
+        PulsarRegistrationClient pulsarRegistrationClient = new PulsarRegistrationClient(store, "/ledgers");
+        DefaultBookieAddressResolver defaultBookieAddressResolver = new DefaultBookieAddressResolver(pulsarRegistrationClient);
+
+        mapping.setBookieAddressResolver(defaultBookieAddressResolver);
+        mapping.setConf(bkClientConf);
+        List<String> racks = mapping
+                .resolve(Lists.newArrayList(BOOKIE1.getHostName(), BOOKIE2.getHostName(), BOOKIE3.getHostName()))
+                .stream().filter(Objects::nonNull).toList();
+        assertEquals(racks.size(), 0);
+
+        HashedWheelTimer timer = new HashedWheelTimer(

Review Comment:
   Stop the timer after the test.



##########
pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java:
##########
@@ -133,6 +137,28 @@ public synchronized void setConf(Configuration conf) {
         }
     }
 
+    private void watchAvailableBookies() {
+        BookieAddressResolver bookieAddressResolver = getBookieAddressResolver();
+        if (bookieAddressResolver instanceof DefaultBookieAddressResolver) {
+            try {
+                Field field = DefaultBookieAddressResolver.class.getDeclaredField("registrationClient");
+                field.setAccessible(true);
+                RegistrationClient registrationClient = (RegistrationClient) field.get(bookieAddressResolver);
+                registrationClient.watchWritableBookies(versioned -> {
+                    try {
+                        racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get()

Review Comment:
   No, It will load data from metadata store while cache missed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org