You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@solr.apache.org by ds...@apache.org on 2024/02/01 04:10:32 UTC

(solr) branch branch_9x updated: SOLR-16699: Add Collection creation time to CLUSTERSTATUS and COLSTATUS API responses (#2226)

This is an automated email from the ASF dual-hosted git repository.

dsmiley pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/solr.git


The following commit(s) were added to refs/heads/branch_9x by this push:
     new 04706685024 SOLR-16699: Add Collection creation time to CLUSTERSTATUS and COLSTATUS API responses (#2226)
04706685024 is described below

commit 047066850243102828142c326a412afc732c25af
Author: pjmcarthur <92...@users.noreply.github.com>
AuthorDate: Wed Jan 31 22:19:51 2024 -0500

    SOLR-16699: Add Collection creation time to CLUSTERSTATUS and COLSTATUS API responses (#2226)
    
    Using ZooKeeper "ctime" of the node.
    
    Co-authored-by: Julien Pilourdault
    Co-authored-by: Paul McArthur <pm...@proton.me>
    Co-authored-by: David Smiley <ds...@salesforce.com>
---
 solr/CHANGES.txt                                   |   3 +
 .../solr/cloud/DistributedClusterStateUpdater.java |   4 +-
 .../solr/cloud/overseer/ClusterStateMutator.java   |  12 ++-
 .../solr/cloud/overseer/CollectionMutator.java     |   1 +
 .../apache/solr/cloud/overseer/ZkStateWriter.java  |   6 +-
 .../org/apache/solr/core/backup/BackupManager.java |   6 +-
 .../apache/solr/handler/admin/ClusterStatus.java   |   2 +
 .../org/apache/solr/handler/admin/ColStatus.java   |   1 +
 .../org/apache/solr/cloud/ClusterStateTest.java    |  20 +++-
 .../apache/solr/cloud/CollectionsAPISolrJTest.java |  31 ++++++
 .../OverseerCollectionConfigSetProcessorTest.java  |   2 +
 .../test/org/apache/solr/cloud/SliceStateTest.java |   4 +-
 .../SimpleCollectionCreateDeleteTest.java          |   7 +-
 .../cloud/api/collections/TestCollectionAPI.java   |   6 +-
 .../solr/cloud/overseer/ZkStateReaderTest.java     |  36 +++++--
 .../solr/cloud/overseer/ZkStateWriterTest.java     | 103 +++++++-----------
 .../pages/collection-management.adoc               |   1 +
 .../solrj/impl/ZkClientClusterStateProvider.java   |  10 +-
 .../org/apache/solr/common/cloud/SolrZkClient.java |  21 ++++
 .../apache/solr/common/cloud/ZkStateReader.java    |   8 +-
 .../apache/solr/common/cloud/SolrZkClientTest.java |  32 ++++++
 .../solrj/impl/BaseHttpClusterStateProvider.java   |  14 ++-
 .../org/apache/solr/common/cloud/ClusterState.java |  30 ++++--
 .../apache/solr/common/cloud/DocCollection.java    |  31 ++++--
 .../solrj/impl/CloudSolrClientCacheTest.java       |   4 +-
 .../solrj/impl/ClusterStateProviderTest.java       | 120 +++++++++++++++++++++
 26 files changed, 415 insertions(+), 100 deletions(-)

diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt
index a0a88c50fab..92f981f721a 100644
--- a/solr/CHANGES.txt
+++ b/solr/CHANGES.txt
@@ -14,6 +14,9 @@ Improvements
 * SOLR-17119: When registering or updating a ConfigurablePlugin through the `/cluster/plugin` API,
   config validation exceptions are now propagated to the callers. (Yohann Callea)
 
+* SOLR-16699: Add Collection creation time to CLUSTERSTATUS and COLSTATUS API responses
+  (Julien Pilourdault, Paul McArthur, David Smiley)
+
 Optimizations
 ---------------------
 (No changes)
diff --git a/solr/core/src/java/org/apache/solr/cloud/DistributedClusterStateUpdater.java b/solr/core/src/java/org/apache/solr/cloud/DistributedClusterStateUpdater.java
index 8b789ea7f09..658cd8f02d4 100644
--- a/solr/core/src/java/org/apache/solr/cloud/DistributedClusterStateUpdater.java
+++ b/solr/core/src/java/org/apache/solr/cloud/DistributedClusterStateUpdater.java
@@ -31,6 +31,7 @@ import static org.apache.solr.common.params.CollectionParams.CollectionAction.DE
 import static org.apache.solr.common.params.CollectionParams.CollectionAction.MODIFYCOLLECTION;
 
 import java.lang.invoke.MethodHandles;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumMap;
@@ -636,7 +637,8 @@ public class DistributedClusterStateUpdater {
               data,
               Collections.emptySet(),
               updater.getCollectionName(),
-              zkStateReader.getZkClient());
+              zkStateReader.getZkClient(),
+              Instant.ofEpochMilli(stat.getCtime()));
 
       return clusterState;
     }
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java
index 11b3795745b..a5edff69aff 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ClusterStateMutator.java
@@ -19,6 +19,7 @@ package org.apache.solr.cloud.overseer;
 import static org.apache.solr.common.params.CommonParams.NAME;
 
 import java.lang.invoke.MethodHandles;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -133,9 +134,18 @@ public class ClusterStateMutator {
     }
 
     assert !collectionProps.containsKey(CollectionAdminParams.COLL_CONF);
+
+    // This instance does not fully capture what will be persisted: the zkNodeVersion and
+    // creationTime will only be definitively set in ZK. Hence, the defaults passed here.
     DocCollection newCollection =
         DocCollection.create(
-            cName, slices, collectionProps, router, -1, stateManager.getPrsSupplier(cName));
+            cName,
+            slices,
+            collectionProps,
+            router,
+            -1,
+            Instant.EPOCH,
+            stateManager.getPrsSupplier(cName));
 
     return new ZkWriteCommand(cName, newCollection);
   }
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
index 318dd595d31..2c3909764df 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/CollectionMutator.java
@@ -176,6 +176,7 @@ public class CollectionMutator {
             props,
             coll.getRouter(),
             coll.getZNodeVersion(),
+            coll.getCreationTime(),
             stateManager.getPrsSupplier(coll.getName()));
     if (replicaOps == null) {
       return new ZkWriteCommand(coll.getName(), collection);
diff --git a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
index f61b9a28ece..f6f10689410 100644
--- a/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
+++ b/solr/core/src/java/org/apache/solr/cloud/overseer/ZkStateWriter.java
@@ -20,6 +20,7 @@ import static java.util.Collections.singletonMap;
 
 import com.codahale.metrics.Timer;
 import java.lang.invoke.MethodHandles;
+import java.time.Instant;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
@@ -303,11 +304,13 @@ public class ZkStateWriter {
                       c.getProperties(),
                       c.getRouter(),
                       stat.getVersion(),
+                      Instant.ofEpochMilli(stat.getCtime()),
                       PerReplicaStatesOps.getZkClientPrsSupplier(reader.getZkClient(), path));
               clusterState = clusterState.copyWith(name, newCollection);
             } else {
               log.debug("going to create_collection {}", path);
-              reader.getZkClient().create(path, data, CreateMode.PERSISTENT, true);
+              Stat stat = new Stat();
+              reader.getZkClient().create(path, data, CreateMode.PERSISTENT, true, stat);
               DocCollection newCollection =
                   DocCollection.create(
                       name,
@@ -315,6 +318,7 @@ public class ZkStateWriter {
                       c.getProperties(),
                       c.getRouter(),
                       0,
+                      Instant.ofEpochMilli(stat.getCtime()),
                       PerReplicaStatesOps.getZkClientPrsSupplier(reader.getZkClient(), path));
               clusterState = clusterState.copyWith(name, newCollection);
             }
diff --git a/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java b/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java
index 8ff78b27e08..e846da51659 100644
--- a/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java
+++ b/solr/core/src/java/org/apache/solr/core/backup/BackupManager.java
@@ -24,6 +24,7 @@ import java.io.Writer;
 import java.lang.invoke.MethodHandles;
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
+import java.time.Instant;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
@@ -221,7 +222,10 @@ public class BackupManager {
         repository.openInput(zkStateDir, COLLECTION_PROPS_FILE, IOContext.DEFAULT)) {
       byte[] arr = new byte[(int) is.length()]; // probably ok since the json file should be small.
       is.readBytes(arr, 0, (int) is.length());
-      ClusterState c_state = ClusterState.createFromJson(-1, arr, Collections.emptySet(), null);
+      // set a default created date, we don't aim at reading actual zookeeper state. The restored
+      // collection will have a new creation date when persisted in zookeeper.
+      ClusterState c_state =
+          ClusterState.createFromJson(-1, arr, Collections.emptySet(), Instant.EPOCH, null);
       return c_state.getCollection(collectionName);
     }
   }
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/ClusterStatus.java b/solr/core/src/java/org/apache/solr/handler/admin/ClusterStatus.java
index 9130fcc317a..f897aea129c 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/ClusterStatus.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/ClusterStatus.java
@@ -183,6 +183,8 @@ public class ClusterStatus {
       collectionStatus = getCollectionStatus(docCollection, name, requestedShards);
 
       collectionStatus.put("znodeVersion", clusterStateCollection.getZNodeVersion());
+      collectionStatus.put(
+          "creationTimeMillis", clusterStateCollection.getCreationTime().toEpochMilli());
 
       if (collectionVsAliases.containsKey(name) && !collectionVsAliases.get(name).isEmpty()) {
         collectionStatus.put("aliases", collectionVsAliases.get(name));
diff --git a/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java b/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java
index 774737e7e21..6befd13b192 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/ColStatus.java
@@ -103,6 +103,7 @@ public class ColStatus {
       }
       SimpleOrderedMap<Object> colMap = new SimpleOrderedMap<>();
       colMap.add("znodeVersion", coll.getZNodeVersion());
+      colMap.add("creationTimeMillis", coll.getCreationTime().toEpochMilli());
       Map<String, Object> props = new TreeMap<>(coll.getProperties());
       props.remove("shards");
       colMap.add("properties", props);
diff --git a/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java b/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java
index ac9b5ffa856..068a8f38a4d 100644
--- a/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/ClusterStateTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.solr.cloud;
 
+import java.time.Instant;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -60,17 +61,21 @@ public class ClusterStateTest extends SolrTestCaseJ4 {
     slices.put("shard2", slice2);
     collectionStates.put(
         "collection1",
-        DocCollection.create("collection1", slices, props, DocRouter.DEFAULT, 0, null));
+        DocCollection.create(
+            "collection1", slices, props, DocRouter.DEFAULT, 0, Instant.EPOCH, null));
     collectionStates.put(
         "collection2",
-        DocCollection.create("collection2", slices, props, DocRouter.DEFAULT, 0, null));
+        DocCollection.create(
+            "collection2", slices, props, DocRouter.DEFAULT, 0, Instant.EPOCH, null));
 
     ClusterState clusterState = new ClusterState(liveNodes, collectionStates);
     assertFalse(clusterState.getCollection("collection1").getProperties().containsKey("shards"));
 
     byte[] bytes = Utils.toJSON(clusterState);
 
-    ClusterState loadedClusterState = ClusterState.createFromJson(-1, bytes, liveNodes, null);
+    Instant creationTime = Instant.now();
+    ClusterState loadedClusterState =
+        ClusterState.createFromJson(-1, bytes, liveNodes, creationTime, null);
     assertFalse(
         loadedClusterState.getCollection("collection1").getProperties().containsKey("shards"));
 
@@ -96,13 +101,18 @@ public class ClusterStateTest extends SolrTestCaseJ4 {
             .get("node1")
             .getStr("prop2"));
 
-    loadedClusterState = ClusterState.createFromJson(-1, new byte[0], liveNodes, null);
+    assertEquals(creationTime, loadedClusterState.getCollection("collection1").getCreationTime());
+    assertEquals(creationTime, loadedClusterState.getCollection("collection2").getCreationTime());
+
+    loadedClusterState =
+        ClusterState.createFromJson(-1, new byte[0], liveNodes, Instant.now(), null);
 
     assertEquals(
         "Provided liveNodes not used properly", 2, loadedClusterState.getLiveNodes().size());
     assertEquals("Should not have collections", 0, loadedClusterState.getCollectionsMap().size());
 
-    loadedClusterState = ClusterState.createFromJson(-1, (byte[]) null, liveNodes, null);
+    loadedClusterState =
+        ClusterState.createFromJson(-1, (byte[]) null, liveNodes, Instant.now(), null);
 
     assertEquals(
         "Provided liveNodes not used properly", 2, loadedClusterState.getLiveNodes().size());
diff --git a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
index 9eabee8a970..6bf199bd28d 100644
--- a/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/CollectionsAPISolrJTest.java
@@ -27,6 +27,7 @@ import java.lang.invoke.MethodHandles;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
@@ -1209,4 +1210,34 @@ public class CollectionsAPISolrJTest extends SolrCloudTestCase {
                 .unsetAttribute("non_existent_attr")
                 .process(cluster.getSolrClient()));
   }
+
+  @Test
+  public void testCollectionCreationTime() throws SolrServerException, IOException {
+    Instant beforeCreation = Instant.now();
+
+    String collectionName = getSaferTestName();
+    CollectionAdminRequest.createCollection(collectionName, "conf", 1, 1)
+        .setPerReplicaState(SolrCloudTestCase.USE_PER_REPLICA_STATE)
+        .process(cluster.getSolrClient());
+
+    cluster.waitForActiveCollection(collectionName, 1, 1);
+
+    Instant afterCreation = Instant.now();
+
+    CollectionAdminRequest.ColStatus req = CollectionAdminRequest.collectionStatus(collectionName);
+    CollectionAdminResponse response = req.process(cluster.getSolrClient());
+    assertEquals(0, response.getStatus());
+
+    NamedList<?> colStatus = (NamedList<?>) response.getResponse().get(collectionName);
+    Long creationTimeMillis = (Long) colStatus._get("creationTimeMillis", null);
+    assertNotNull("creationTimeMillis was not included in COLSTATUS response", creationTimeMillis);
+
+    Instant creationTime = Instant.ofEpochMilli(creationTimeMillis);
+    assertTrue(
+        "COLSTATUS creationTimeMillis should be after the test started",
+        creationTime.isAfter(beforeCreation));
+    assertTrue(
+        "COLSTATUS creationTimeMillis should not be after the collection creation was completed",
+        creationTime.isBefore(afterCreation));
+  }
 }
diff --git a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
index 5e179e0d0f5..b38ad73f820 100644
--- a/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/OverseerCollectionConfigSetProcessorTest.java
@@ -30,6 +30,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.lang.invoke.MethodHandles;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -688,6 +689,7 @@ public class OverseerCollectionConfigSetProcessorTest extends SolrTestCaseJ4 {
                       props.getProperties(),
                       DocRouter.DEFAULT,
                       0,
+                      Instant.EPOCH,
                       distribStateManagerMock.getPrsSupplier(collName))));
       }
       if (CollectionParams.CollectionAction.ADDREPLICA.isEqual(props.getStr("operation"))) {
diff --git a/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java b/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java
index 4517cc1bc15..64ec9f3c035 100644
--- a/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/SliceStateTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.solr.cloud;
 
+import java.time.Instant;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -59,7 +60,8 @@ public class SliceStateTest extends SolrTestCaseJ4 {
 
     ClusterState clusterState = new ClusterState(liveNodes, collectionStates);
     byte[] bytes = Utils.toJSON(clusterState);
-    ClusterState loadedClusterState = ClusterState.createFromJson(-1, bytes, liveNodes, null);
+    ClusterState loadedClusterState =
+        ClusterState.createFromJson(-1, bytes, liveNodes, Instant.now(), null);
 
     assertSame(
         "Default state not set to active",
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/SimpleCollectionCreateDeleteTest.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/SimpleCollectionCreateDeleteTest.java
index 9d8c18f42d7..64fcb5edd4e 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/SimpleCollectionCreateDeleteTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/SimpleCollectionCreateDeleteTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.solr.cloud.api.collections;
 
+import java.time.Instant;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
@@ -123,7 +124,11 @@ public class SimpleCollectionCreateDeleteTest extends AbstractFullDistribZkTestB
 
     DocCollection c =
         ClusterState.createFromCollectionMap(
-                0, (Map<String, Object>) Utils.fromJSON(node.data), Collections.emptySet(), null)
+                0,
+                (Map<String, Object>) Utils.fromJSON(node.data),
+                Collections.emptySet(),
+                Instant.EPOCH,
+                null)
             .getCollection(collectionName);
 
     Set<String> knownKeys =
diff --git a/solr/core/src/test/org/apache/solr/cloud/api/collections/TestCollectionAPI.java b/solr/core/src/test/org/apache/solr/cloud/api/collections/TestCollectionAPI.java
index 29c103cb9a3..b3e7cfb6d92 100644
--- a/solr/core/src/test/org/apache/solr/cloud/api/collections/TestCollectionAPI.java
+++ b/solr/core/src/test/org/apache/solr/cloud/api/collections/TestCollectionAPI.java
@@ -17,6 +17,7 @@
 package org.apache.solr.cloud.api.collections;
 
 import java.io.IOException;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -495,7 +496,10 @@ public class TestCollectionAPI extends ReplicaPropertiesBase {
       Map<String, Object> collection = (Map<String, Object>) collections.get(COLLECTION_NAME);
       assertNotNull(collection);
       assertEquals("conf1", collection.get("configName"));
-      //      assertEquals("1", collection.get("nrtReplicas"));
+
+      Instant creationTime = Instant.ofEpochMilli((long) collection.get("creationTimeMillis"));
+      assertEquals(
+          creationTime, client.getClusterState().getCollection(COLLECTION_NAME).getCreationTime());
     }
   }
 
diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
index 3e1ca33963c..43c52d96a19 100644
--- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateReaderTest.java
@@ -20,6 +20,7 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.nio.file.Path;
+import java.time.Instant;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -56,6 +57,7 @@ import org.apache.solr.handler.admin.ConfigSetsHandler;
 import org.apache.solr.util.LogLevel;
 import org.apache.solr.util.TimeOut;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
 import org.junit.After;
 import org.junit.Before;
 import org.slf4j.Logger;
@@ -146,6 +148,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
                 Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
                 DocRouter.DEFAULT,
                 0,
+                Instant.now(),
                 PerReplicaStatesOps.getZkClientPrsSupplier(
                     fixture.zkClient, DocCollection.getCollectionPath("c1"))));
 
@@ -173,6 +176,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
             Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
             DocRouter.DEFAULT,
             0,
+            Instant.now(),
             PerReplicaStatesOps.getZkClientPrsSupplier(
                 fixture.zkClient, DocCollection.getCollectionPath("c1")));
     ZkWriteCommand wc = new ZkWriteCommand("c1", state);
@@ -192,6 +196,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
             props,
             DocRouter.DEFAULT,
             0,
+            Instant.now(),
             PerReplicaStatesOps.getZkClientPrsSupplier(
                 fixture.zkClient, DocCollection.getCollectionPath("c1")));
     wc = new ZkWriteCommand("c1", state);
@@ -233,6 +238,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
             Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
             DocRouter.DEFAULT,
             0,
+            Instant.now(),
             PerReplicaStatesOps.getZkClientPrsSupplier(
                 fixture.zkClient, DocCollection.getCollectionPath("c1")));
     ZkWriteCommand wc = new ZkWriteCommand("c1", state);
@@ -246,6 +252,10 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
     ClusterState.CollectionRef ref = reader.getClusterState().getCollectionRef("c1");
     assertNotNull(ref);
     assertFalse(ref.isLazilyLoaded());
+
+    Stat stat = new Stat();
+    fixture.zkClient.getData(ZkStateReader.getCollectionPath("c1"), null, stat, false);
+    assertEquals(Instant.ofEpochMilli(stat.getCtime()), ref.get().getCreationTime());
   }
 
   /**
@@ -271,6 +281,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
                 "true"),
             DocRouter.DEFAULT,
             0,
+            Instant.now(),
             PerReplicaStatesOps.getZkClientPrsSupplier(
                 fixture.zkClient, DocCollection.getCollectionPath("c1")));
     ZkWriteCommand wc = new ZkWriteCommand("c1", state);
@@ -391,6 +402,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
             Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
             DocRouter.DEFAULT,
             0,
+            Instant.now(),
             PerReplicaStatesOps.getZkClientPrsSupplier(
                 fixture.zkClient, DocCollection.getCollectionPath("c1")));
     ZkWriteCommand wc = new ZkWriteCommand("c1", state);
@@ -413,6 +425,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
             Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
             DocRouter.DEFAULT,
             ref.get().getZNodeVersion(),
+            Instant.now(),
             PerReplicaStatesOps.getZkClientPrsSupplier(
                 fixture.zkClient, DocCollection.getCollectionPath("c1")));
     wc = new ZkWriteCommand("c1", state);
@@ -436,6 +449,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
             Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
             DocRouter.DEFAULT,
             0,
+            Instant.now(),
             PerReplicaStatesOps.getZkClientPrsSupplier(
                 fixture.zkClient, DocCollection.getCollectionPath("c2")));
     ZkWriteCommand wc2 = new ZkWriteCommand("c2", state);
@@ -470,12 +484,14 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
 
     // create new collection
     DocCollection state =
-        new DocCollection(
+        DocCollection.create(
             "c1",
             new HashMap<>(),
             Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
             DocRouter.DEFAULT,
-            0);
+            0,
+            Instant.now(),
+            null);
     ZkWriteCommand wc = new ZkWriteCommand("c1", state);
     writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
     writer.writePendingUpdates();
@@ -490,12 +506,14 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
 
     // update the collection
     state =
-        new DocCollection(
+        DocCollection.create(
             "c1",
             new HashMap<>(),
             Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
             DocRouter.DEFAULT,
-            ref.get().getZNodeVersion());
+            ref.get().getZNodeVersion(),
+            Instant.now(),
+            null);
     wc = new ZkWriteCommand("c1", state);
     writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(wc), null);
     writer.writePendingUpdates();
@@ -511,12 +529,14 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
 
     fixture.zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c2", true);
     state =
-        new DocCollection(
+        DocCollection.create(
             "c2",
             new HashMap<>(),
             Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
             DocRouter.DEFAULT,
-            0);
+            0,
+            Instant.now(),
+            null);
     ZkWriteCommand wc2 = new ZkWriteCommand("c2", state);
 
     writer.enqueueUpdate(reader.getClusterState(), Arrays.asList(wc1, wc2), null);
@@ -552,6 +572,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
             Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
             DocRouter.DEFAULT,
             0,
+            Instant.now(),
             PerReplicaStatesOps.getZkClientPrsSupplier(
                 fixture.zkClient, DocCollection.getCollectionPath("c1")));
     ZkWriteCommand wc1 = new ZkWriteCommand("c1", state1);
@@ -562,6 +583,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
             Map.of(ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
             DocRouter.DEFAULT,
             0,
+            Instant.now(),
             PerReplicaStatesOps.getZkClientPrsSupplier(
                 fixture.zkClient, DocCollection.getCollectionPath("c1")));
 
@@ -617,6 +639,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
                             ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
                         DocRouter.DEFAULT,
                         currentVersion,
+                        Instant.now(),
                         PerReplicaStatesOps.getZkClientPrsSupplier(
                             fixture.zkClient, DocCollection.getCollectionPath("c1")));
                 ZkWriteCommand wc = new ZkWriteCommand("c1", state);
@@ -693,6 +716,7 @@ public class ZkStateReaderTest extends SolrTestCaseJ4 {
             Collections.singletonMap(DocCollection.CollectionStateProps.PER_REPLICA_STATE, true),
             DocRouter.DEFAULT,
             0,
+            Instant.now(),
             PerReplicaStatesOps.getZkClientPrsSupplier(
                 fixture.zkClient, DocCollection.getCollectionPath(collectionName)));
     ZkWriteCommand wc = new ZkWriteCommand(collectionName, state);
diff --git a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
index 1a2c940e8dd..7d05566675a 100644
--- a/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
+++ b/solr/core/src/test/org/apache/solr/cloud/overseer/ZkStateWriterTest.java
@@ -18,6 +18,7 @@ package org.apache.solr.cloud.overseer;
 
 import java.lang.invoke.MethodHandles;
 import java.nio.file.Path;
+import java.time.Instant;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -43,6 +44,7 @@ import org.apache.solr.common.util.Utils;
 import org.apache.solr.common.util.ZLibCompressor;
 import org.apache.solr.handler.admin.ConfigSetsHandler;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.slf4j.Logger;
@@ -95,15 +97,9 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
         Map<String, Object> props =
             Collections.singletonMap(
                 ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME);
-        ZkWriteCommand c1 =
-            new ZkWriteCommand(
-                "c1", new DocCollection("c1", new HashMap<>(), props, DocRouter.DEFAULT, 0));
-        ZkWriteCommand c2 =
-            new ZkWriteCommand(
-                "c2", new DocCollection("c2", new HashMap<>(), props, DocRouter.DEFAULT, 0));
-        ZkWriteCommand c3 =
-            new ZkWriteCommand(
-                "c3", new DocCollection("c3", new HashMap<>(), props, DocRouter.DEFAULT, 0));
+        ZkWriteCommand c1 = new ZkWriteCommand("c1", createDocCollection("c1", props));
+        ZkWriteCommand c2 = new ZkWriteCommand("c2", createDocCollection("c2", props));
+        ZkWriteCommand c3 = new ZkWriteCommand("c3", createDocCollection("c3", props));
         ZkStateWriter writer =
             new ZkStateWriter(reader, new Stats(), -1, STATE_COMPRESSION_PROVIDER);
 
@@ -161,18 +157,9 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
         zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c3", true);
         zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/prs1", true);
 
-        ZkWriteCommand c1 =
-            new ZkWriteCommand(
-                "c1",
-                new DocCollection("c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0));
-        ZkWriteCommand c2 =
-            new ZkWriteCommand(
-                "c2",
-                new DocCollection("c2", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0));
-        ZkWriteCommand c3 =
-            new ZkWriteCommand(
-                "c3",
-                new DocCollection("c3", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0));
+        ZkWriteCommand c1 = new ZkWriteCommand("c1", createDocCollection("c1", new HashMap<>()));
+        ZkWriteCommand c2 = new ZkWriteCommand("c2", createDocCollection("c2", new HashMap<>()));
+        ZkWriteCommand c3 = new ZkWriteCommand("c3", createDocCollection("c3", new HashMap<>()));
         Map<String, Object> prsProps = new HashMap<>();
         prsProps.put("perReplicaState", Boolean.TRUE);
         ZkWriteCommand prs1 =
@@ -184,6 +171,7 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
                     prsProps,
                     DocRouter.DEFAULT,
                     0,
+                    Instant.now(),
                     PerReplicaStatesOps.getZkClientPrsSupplier(
                         zkClient, DocCollection.getCollectionPath("c1"))));
         ZkStateWriter writer =
@@ -244,21 +232,9 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
         zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c3", true);
         zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/prs1", true);
 
-        ZkWriteCommand c1 =
-            new ZkWriteCommand(
-                "c1",
-                DocCollection.create(
-                    "c1", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, null));
-        ZkWriteCommand c2 =
-            new ZkWriteCommand(
-                "c2",
-                DocCollection.create(
-                    "c2", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, null));
-        ZkWriteCommand c3 =
-            new ZkWriteCommand(
-                "c3",
-                DocCollection.create(
-                    "c3", new HashMap<>(), new HashMap<>(), DocRouter.DEFAULT, 0, null));
+        ZkWriteCommand c1 = new ZkWriteCommand("c1", createDocCollection("c1", new HashMap<>()));
+        ZkWriteCommand c2 = new ZkWriteCommand("c2", createDocCollection("c2", new HashMap<>()));
+        ZkWriteCommand c3 = new ZkWriteCommand("c3", createDocCollection("c3", new HashMap<>()));
         Map<String, Object> prsProps = new HashMap<>();
         prsProps.put("perReplicaState", Boolean.TRUE);
         ZkWriteCommand prs1 =
@@ -270,6 +246,7 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
                     prsProps,
                     DocRouter.DEFAULT,
                     0,
+                    Instant.now(),
                     PerReplicaStatesOps.getZkClientPrsSupplier(
                         zkClient, DocCollection.getCollectionPath("prs1"))));
         ZkStateWriter writer =
@@ -333,16 +310,13 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
         ZkWriteCommand c1 =
             new ZkWriteCommand(
                 "c1",
-                new DocCollection(
+                createDocCollection(
                     "c1",
-                    new HashMap<String, Slice>(),
                     Collections.singletonMap(
-                        ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
-                    DocRouter.DEFAULT,
-                    0));
+                        ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME)));
 
         writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
-        writer.writePendingUpdates();
+        ClusterState clusterState = writer.writePendingUpdates();
 
         Map<?, ?> map =
             (Map<?, ?>)
@@ -350,6 +324,12 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
                     zkClient.getData(
                         ZkStateReader.COLLECTIONS_ZKNODE + "/c1/state.json", null, null, true));
         assertNotNull(map.get("c1"));
+
+        Stat stat = new Stat();
+        zkClient.getData(ZkStateReader.getCollectionPath("c1"), null, stat, false);
+        assertEquals(
+            Instant.ofEpochMilli(stat.getCtime()),
+            clusterState.getCollection("c1").getCreationTime());
       }
     } finally {
       IOUtils.close(zkClient);
@@ -389,13 +369,10 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
         ZkWriteCommand c2 =
             new ZkWriteCommand(
                 "c2",
-                new DocCollection(
+                createDocCollection(
                     "c2",
-                    new HashMap<String, Slice>(),
                     Collections.singletonMap(
-                        ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
-                    DocRouter.DEFAULT,
-                    0));
+                        ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME)));
         state = writer.enqueueUpdate(state, Collections.singletonList(c2), null);
         assertFalse(writer.hasPendingUpdates()); // first write is flushed immediately
 
@@ -408,6 +385,12 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
         // get the most up-to-date state
         reader.forceUpdateCollection("c2");
         state = reader.getClusterState();
+
+        Stat stat = new Stat();
+        zkClient.getData(ZkStateReader.getCollectionPath("c2"), null, stat, false);
+        assertEquals(
+            Instant.ofEpochMilli(stat.getCtime()), state.getCollection("c2").getCreationTime());
+
         log.info("Cluster state: {}", state);
         assertTrue(state.hasCollection("c2"));
         assertEquals(c2Version + 1, state.getCollection("c2").getZNodeVersion());
@@ -424,13 +407,10 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
         ZkWriteCommand c1 =
             new ZkWriteCommand(
                 "c1",
-                new DocCollection(
+                createDocCollection(
                     "c1",
-                    new HashMap<String, Slice>(),
                     Collections.singletonMap(
-                        ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME),
-                    DocRouter.DEFAULT,
-                    0));
+                        ZkStateReader.CONFIGNAME_PROP, ConfigSetsHandler.DEFAULT_CONFIGSET_NAME)));
 
         try {
           writer.enqueueUpdate(state, Collections.singletonList(c1), null);
@@ -486,15 +466,7 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
         zkClient.makePath(ZkStateReader.COLLECTIONS_ZKNODE + "/c1", true);
 
         // create new collection with stateFormat = 2
-        ZkWriteCommand c1 =
-            new ZkWriteCommand(
-                "c1",
-                new DocCollection(
-                    "c1",
-                    new HashMap<String, Slice>(),
-                    new HashMap<String, Object>(),
-                    DocRouter.DEFAULT,
-                    0));
+        ZkWriteCommand c1 = new ZkWriteCommand("c1", createDocCollection("c1", new HashMap<>()));
 
         writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
         writer.writePendingUpdates();
@@ -530,7 +502,9 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
         }
         ZkWriteCommand c1 =
             new ZkWriteCommand(
-                "c2", new DocCollection("c2", slices, new HashMap<>(), DocRouter.DEFAULT, 0));
+                "c2",
+                DocCollection.create(
+                    "c2", slices, new HashMap<>(), DocRouter.DEFAULT, 0, Instant.now(), null));
 
         writer.enqueueUpdate(reader.getClusterState(), Collections.singletonList(c1), null);
         writer.writePendingUpdates();
@@ -549,4 +523,9 @@ public class ZkStateWriterTest extends SolrTestCaseJ4 {
       server.shutdown();
     }
   }
+
+  private DocCollection createDocCollection(String name, Map<String, Object> props) {
+    return DocCollection.create(
+        name, new HashMap<>(), props, DocRouter.DEFAULT, 0, Instant.now(), null);
+  }
 }
diff --git a/solr/solr-ref-guide/modules/deployment-guide/pages/collection-management.adoc b/solr/solr-ref-guide/modules/deployment-guide/pages/collection-management.adoc
index d1abf465394..695d196200b 100644
--- a/solr/solr-ref-guide/modules/deployment-guide/pages/collection-management.adoc
+++ b/solr/solr-ref-guide/modules/deployment-guide/pages/collection-management.adoc
@@ -1253,6 +1253,7 @@ http://localhost:8983/solr/admin/collections?action=COLSTATUS&collection=getting
     },
     "gettingstarted": {
         "znodeVersion": 16,
+        "creationTimeMillis": 1706228861003,
         "properties": {
             "nrtReplicas": "2",
             "pullReplicas": "0",
diff --git a/solr/solrj-zookeeper/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java b/solr/solrj-zookeeper/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java
index 36c5891da1e..f9d202f5949 100644
--- a/solr/solrj-zookeeper/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java
+++ b/solr/solrj-zookeeper/src/java/org/apache/solr/client/solrj/impl/ZkClientClusterStateProvider.java
@@ -19,6 +19,7 @@ package org.apache.solr.client.solrj.impl;
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.time.Instant;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -93,12 +94,18 @@ public class ZkClientClusterStateProvider
    * @param liveNodes list of live nodes
    * @param coll collection name
    * @param zkClient ZK client
+   * @param createTime creation time of the data/bytes
    * @return the ClusterState
    */
   @SuppressWarnings({"unchecked"})
   @Deprecated
   public static ClusterState createFromJsonSupportingLegacyConfigName(
-      int version, byte[] bytes, Set<String> liveNodes, String coll, SolrZkClient zkClient) {
+      int version,
+      byte[] bytes,
+      Set<String> liveNodes,
+      String coll,
+      SolrZkClient zkClient,
+      Instant createTime) {
     if (bytes == null || bytes.length == 0) {
       return new ClusterState(liveNodes, Collections.emptyMap());
     }
@@ -129,6 +136,7 @@ public class ZkClientClusterStateProvider
         version,
         stateMap,
         liveNodes,
+        createTime,
         PerReplicaStatesOps.getZkClientPrsSupplier(
             zkClient, DocCollection.getCollectionPath(coll)));
   }
diff --git a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java
index 4ae9d16f123..25b16e18fc4 100644
--- a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java
+++ b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/SolrZkClient.java
@@ -543,6 +543,27 @@ public class SolrZkClient implements Closeable {
     return result;
   }
 
+  /**
+   * Returns path of created node
+   *
+   * @param stat Output argument that captures created node details
+   */
+  public String create(
+      final String path,
+      final byte[] data,
+      final CreateMode createMode,
+      boolean retryOnConnLoss,
+      Stat stat)
+      throws KeeperException, InterruptedException {
+    if (retryOnConnLoss) {
+      return zkCmdExecutor.retryOperation(
+          () -> keeper.create(path, data, zkACLProvider.getACLsToAdd(path), createMode, stat));
+    } else {
+      List<ACL> acls = zkACLProvider.getACLsToAdd(path);
+      return keeper.create(path, data, acls, createMode, stat);
+    }
+  }
+
   /**
    * Creates the path in ZooKeeper, creating each node as necessary.
    *
diff --git a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java
index faaa44bed94..d3f27730139 100644
--- a/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java
+++ b/solr/solrj-zookeeper/src/java/org/apache/solr/common/cloud/ZkStateReader.java
@@ -20,6 +20,7 @@ import static java.util.Collections.emptyMap;
 import static java.util.Collections.emptySortedSet;
 
 import java.lang.invoke.MethodHandles;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -1614,7 +1615,12 @@ public class ZkStateReader implements SolrCloseable {
         // TODO in Solr 10 remove that factory method
         ClusterState state =
             ZkClientClusterStateProvider.createFromJsonSupportingLegacyConfigName(
-                stat.getVersion(), data, Collections.emptySet(), coll, zkClient);
+                stat.getVersion(),
+                data,
+                Collections.emptySet(),
+                coll,
+                zkClient,
+                Instant.ofEpochMilli(stat.getCtime()));
 
         ClusterState.CollectionRef collectionRef = state.getCollectionStates().get(coll);
         return collectionRef == null ? null : collectionRef.get();
diff --git a/solr/solrj-zookeeper/src/test/org/apache/solr/common/cloud/SolrZkClientTest.java b/solr/solrj-zookeeper/src/test/org/apache/solr/common/cloud/SolrZkClientTest.java
index 9d1eacd8950..6e59fe3972a 100644
--- a/solr/solrj-zookeeper/src/test/org/apache/solr/common/cloud/SolrZkClientTest.java
+++ b/solr/solrj-zookeeper/src/test/org/apache/solr/common/cloud/SolrZkClientTest.java
@@ -35,12 +35,14 @@ import org.apache.solr.cloud.SolrCloudTestCase;
 import org.apache.solr.cloud.ZkTestServer;
 import org.apache.solr.core.SolrResourceLoader;
 import org.apache.solr.util.ExternalPaths;
+import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -313,4 +315,34 @@ public class SolrZkClientTest extends SolrCloudTestCase {
       return List.of(new ZkCredential("someuser", "somepass", ZkCredential.Perms.READ));
     }
   }
+
+  @Test
+  public void testCreateWithStat() throws InterruptedException, KeeperException {
+    String path = "/collections/" + "collectionName_" + getSaferTestName();
+    try {
+      Stat createStat = new Stat();
+      defaultClient.create(
+          path, "hello".getBytes(StandardCharsets.UTF_8), CreateMode.PERSISTENT, false, createStat);
+      Stat readStat = new Stat();
+      defaultClient.getData(path, null, readStat, false);
+      assertEquals(createStat, readStat);
+    } finally {
+      defaultClient.delete(path, 0, false);
+    }
+  }
+
+  @Test
+  public void testCreateWithStatAndRetry() throws InterruptedException, KeeperException {
+    String path = "/collections/" + "collectionName_" + getSaferTestName();
+    try {
+      Stat createStat = new Stat();
+      defaultClient.create(
+          path, "hello".getBytes(StandardCharsets.UTF_8), CreateMode.PERSISTENT, true, createStat);
+      Stat readStat = new Stat();
+      defaultClient.getData(path, null, readStat, false);
+      assertEquals(createStat, readStat);
+    } finally {
+      defaultClient.delete(path, 0, false);
+    }
+  }
 }
diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java
index 74a225d8a90..d5e2d188a75 100644
--- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java
+++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseHttpClusterStateProvider.java
@@ -21,6 +21,7 @@ import static org.apache.solr.client.solrj.impl.BaseHttpSolrClient.RemoteSolrExc
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.time.Instant;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -155,7 +156,12 @@ public abstract class BaseHttpClusterStateProvider implements ClusterStateProvid
     for (Map.Entry<String, Object> e : collectionsMap.entrySet()) {
       @SuppressWarnings("rawtypes")
       Map m = (Map) e.getValue();
-      cs = cs.copyWith(e.getKey(), fillPrs(znodeVersion, e, m));
+      Long creationTimeMillisFromClusterStatus = (Long) m.get("creationTimeMillis");
+      Instant creationTime =
+          creationTimeMillisFromClusterStatus == null
+              ? Instant.EPOCH
+              : Instant.ofEpochMilli(creationTimeMillisFromClusterStatus);
+      cs = cs.copyWith(e.getKey(), fillPrs(znodeVersion, e, creationTime, m));
     }
 
     if (clusterProperties != null) {
@@ -168,7 +174,8 @@ public abstract class BaseHttpClusterStateProvider implements ClusterStateProvid
   }
 
   @SuppressWarnings({"rawtypes", "unchecked"})
-  private DocCollection fillPrs(int znodeVersion, Map.Entry<String, Object> e, Map m) {
+  private DocCollection fillPrs(
+      int znodeVersion, Map.Entry<String, Object> e, Instant creationTime, Map m) {
     DocCollection.PrsSupplier prsSupplier = null;
     if (m.containsKey("PRS")) {
       Map prs = (Map) m.remove("PRS");
@@ -180,7 +187,8 @@ public abstract class BaseHttpClusterStateProvider implements ClusterStateProvid
                   (List<String>) prs.get("states"));
     }
 
-    return ClusterState.collectionFromObjects(e.getKey(), m, znodeVersion, prsSupplier);
+    return ClusterState.collectionFromObjects(
+        e.getKey(), m, znodeVersion, creationTime, prsSupplier);
   }
 
   @Override
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
index 4418d1edaa9..83898f57631 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/ClusterState.java
@@ -20,6 +20,7 @@ import static org.apache.solr.common.util.Utils.STANDARDOBJBUILDER;
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.time.Instant;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -225,28 +226,35 @@ public class ClusterState implements MapWriter {
    *     Json representation of a {@link DocCollection} as written by {@link #write(JSONWriter)}. It
    *     can represent one or more collections.
    * @param liveNodes list of live nodes
+   * @param creationTime assigns this date to all {@link DocCollection} referenced by the returned
+   *     {@link ClusterState}
    * @return the ClusterState
    */
   public static ClusterState createFromJson(
-      int version, byte[] bytes, Set<String> liveNodes, DocCollection.PrsSupplier prsSupplier) {
+      int version,
+      byte[] bytes,
+      Set<String> liveNodes,
+      Instant creationTime,
+      DocCollection.PrsSupplier prsSupplier) {
     if (bytes == null || bytes.length == 0) {
       return new ClusterState(liveNodes, Collections.<String, DocCollection>emptyMap());
     }
     @SuppressWarnings({"unchecked"})
     Map<String, Object> stateMap =
         (Map<String, Object>) Utils.fromJSON(bytes, 0, bytes.length, STR_INTERNER_OBJ_BUILDER);
-    return createFromCollectionMap(version, stateMap, liveNodes, prsSupplier);
+    return createFromCollectionMap(version, stateMap, liveNodes, creationTime, prsSupplier);
   }
 
   @Deprecated
   public static ClusterState createFromJson(int version, byte[] bytes, Set<String> liveNodes) {
-    return createFromJson(version, bytes, liveNodes, null);
+    return createFromJson(version, bytes, liveNodes, Instant.EPOCH, null);
   }
 
   public static ClusterState createFromCollectionMap(
       int version,
       Map<String, Object> stateMap,
       Set<String> liveNodes,
+      Instant creationTime,
       DocCollection.PrsSupplier prsSupplier) {
     Map<String, CollectionRef> collections = CollectionUtil.newLinkedHashMap(stateMap.size());
     for (Entry<String, Object> entry : stateMap.entrySet()) {
@@ -254,7 +262,11 @@ public class ClusterState implements MapWriter {
       @SuppressWarnings({"unchecked"})
       DocCollection coll =
           collectionFromObjects(
-              collectionName, (Map<String, Object>) entry.getValue(), version, prsSupplier);
+              collectionName,
+              (Map<String, Object>) entry.getValue(),
+              version,
+              creationTime,
+              prsSupplier);
       collections.put(collectionName, new CollectionRef(coll));
     }
 
@@ -264,12 +276,16 @@ public class ClusterState implements MapWriter {
   @Deprecated
   public static ClusterState createFromCollectionMap(
       int version, Map<String, Object> stateMap, Set<String> liveNodes) {
-    return createFromCollectionMap(version, stateMap, liveNodes, null);
+    return createFromCollectionMap(version, stateMap, liveNodes, Instant.EPOCH, null);
   }
 
   // TODO move to static DocCollection.loadFromMap
   public static DocCollection collectionFromObjects(
-      String name, Map<String, Object> objs, int version, DocCollection.PrsSupplier prsSupplier) {
+      String name,
+      Map<String, Object> objs,
+      int version,
+      Instant creationTime,
+      DocCollection.PrsSupplier prsSupplier) {
     Map<String, Object> props;
     Map<String, Slice> slices;
 
@@ -304,7 +320,7 @@ public class ClusterState implements MapWriter {
       router = DocRouter.getDocRouter((String) routerProps.get("name"));
     }
 
-    return DocCollection.create(name, slices, props, router, version, prsSupplier);
+    return DocCollection.create(name, slices, props, router, version, creationTime, prsSupplier);
   }
 
   @Override
diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
index 85409a33093..5c6db6b8da8 100644
--- a/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
+++ b/solr/solrj/src/java/org/apache/solr/common/cloud/DocCollection.java
@@ -20,6 +20,7 @@ import static org.apache.solr.common.util.Utils.toJSONString;
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
+import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EnumSet;
@@ -62,21 +63,22 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
   private final Integer numTlogReplicas;
   private final Integer numPullReplicas;
   private final Boolean readOnly;
+  private final Instant creationTime;
   private final Boolean perReplicaState;
   private final Map<String, Replica> replicaMap = new HashMap<>();
   private AtomicReference<PerReplicaStates> perReplicaStatesRef;
 
   /**
-   * @see DocCollection#create(String, Map, Map, DocRouter, int, PrsSupplier)
+   * @see DocCollection#create(String, Map, Map, DocRouter, int, Instant, PrsSupplier)
    */
   @Deprecated
   public DocCollection(
       String name, Map<String, Slice> slices, Map<String, Object> props, DocRouter router) {
-    this(name, slices, props, router, Integer.MAX_VALUE, null);
+    this(name, slices, props, router, Integer.MAX_VALUE, Instant.EPOCH, null);
   }
 
   /**
-   * @see DocCollection#create(String, Map, Map, DocRouter, int, PrsSupplier)
+   * @see DocCollection#create(String, Map, Map, DocRouter, int, Instant, PrsSupplier)
    */
   @Deprecated
   public DocCollection(
@@ -85,7 +87,7 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
       Map<String, Object> props,
       DocRouter router,
       int zkVersion) {
-    this(name, slices, props, router, zkVersion, null);
+    this(name, slices, props, router, zkVersion, Instant.EPOCH, null);
   }
 
   /**
@@ -95,7 +97,8 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
    * @param props The properties of the slice. This is used directly and a copy is not made.
    * @param zkVersion The version of the Collection node in Zookeeper (used for conditional
    *     updates).
-   * @see DocCollection#create(String, Map, Map, DocRouter, int, PrsSupplier)
+   * @param creationTime The creation time of the collection
+   * @see DocCollection#create(String, Map, Map, DocRouter, int, Instant, PrsSupplier)
    */
   private DocCollection(
       String name,
@@ -103,6 +106,7 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
       Map<String, Object> props,
       DocRouter router,
       int zkVersion,
+      Instant creationTime,
       AtomicReference<PerReplicaStates> perReplicaStatesRef) {
     super(props);
     // -1 means any version in ZK CAS, so we choose Integer.MAX_VALUE instead to avoid accidental
@@ -133,6 +137,7 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
     }
     Boolean readOnly = (Boolean) verifyProp(props, CollectionStateProps.READ_ONLY);
     this.readOnly = readOnly == null ? Boolean.FALSE : readOnly;
+    this.creationTime = creationTime;
 
     Iterator<Map.Entry<String, Slice>> iter = slices.entrySet().iterator();
 
@@ -164,6 +169,7 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
    * @param router router to partition int range into n ranges
    * @param zkVersion The version of the Collection node in Zookeeper (used for conditional
    *     updates).
+   * @param creationTime The creation time of the collection
    * @param prsSupplier optional supplier for PerReplicaStates (PRS) for PRS enabled collections
    * @return a newly constructed DocCollection
    */
@@ -173,6 +179,7 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
       Map<String, Object> props,
       DocRouter router,
       int zkVersion,
+      Instant creationTime,
       DocCollection.PrsSupplier prsSupplier) {
     boolean perReplicaState =
         (Boolean) verifyProp(props, CollectionStateProps.PER_REPLICA_STATE, Boolean.FALSE);
@@ -180,7 +187,7 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
     if (perReplicaState) {
       if (prsSupplier == null) {
         throw new IllegalArgumentException(
-            CollectionStateProps.PER_REPLICA_STATE + " = true , but prsSuppler is not provided");
+            CollectionStateProps.PER_REPLICA_STATE + " = true , but prsSupplier is not provided");
       }
 
       if (!hasAnyReplica(
@@ -200,6 +207,7 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
         props,
         router,
         zkVersion,
+        creationTime,
         perReplicaStates != null ? new AtomicReference<>(perReplicaStates) : null);
   }
 
@@ -287,7 +295,8 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
    */
   public DocCollection copyWithSlices(Map<String, Slice> slices) {
     DocCollection result =
-        new DocCollection(getName(), slices, propMap, router, znodeVersion, perReplicaStatesRef);
+        new DocCollection(
+            getName(), slices, propMap, router, znodeVersion, creationTime, perReplicaStatesRef);
     return result;
   }
 
@@ -383,6 +392,14 @@ public class DocCollection extends ZkNodeProps implements Iterable<Slice> {
     return readOnly;
   }
 
+  /**
+   * The creation time of the Collection. When this collection is read from ZooKeeper, this is the
+   * creation time of the collection node.
+   */
+  public Instant getCreationTime() {
+    return creationTime;
+  }
+
   @Override
   public String toString() {
     return "DocCollection("
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java
index 8c3f74bdb13..9603dccbac3 100644
--- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientCacheTest.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.when;
 
 import java.net.ConnectException;
 import java.net.SocketException;
+import java.time.Instant;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -86,7 +87,8 @@ public class CloudSolrClientCacheTest extends SolrTestCaseJ4 {
                 .build()) {
       livenodes.addAll(Set.of("192.168.1.108:7574_solr", "192.168.1.108:8983_solr"));
       ClusterState cs =
-          ClusterState.createFromJson(1, coll1State.getBytes(UTF_8), Collections.emptySet(), null);
+          ClusterState.createFromJson(
+              1, coll1State.getBytes(UTF_8), Collections.emptySet(), Instant.now(), null);
       refs.put(collName, new Ref(collName));
       colls.put(collName, cs.getCollectionOrNull(collName));
       responses.put(
diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ClusterStateProviderTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ClusterStateProviderTest.java
new file mode 100644
index 00000000000..c181a59e7da
--- /dev/null
+++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/ClusterStateProviderTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.solr.client.solrj.impl;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+import java.util.Map;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
+import org.apache.solr.cloud.SolrCloudTestCase;
+import org.apache.solr.common.cloud.ClusterState;
+import org.apache.solr.common.cloud.DocCollection;
+import org.apache.solr.common.util.NamedList;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class ClusterStateProviderTest extends SolrCloudTestCase {
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    configureCluster(1)
+        .addConfig(
+            "conf",
+            getFile("solrj")
+                .toPath()
+                .resolve("solr")
+                .resolve("configsets")
+                .resolve("streaming")
+                .resolve("conf"))
+        .configure();
+  }
+
+  private ClusterStateProvider createClusterStateProvider() throws Exception {
+    return !usually() ? http2ClusterStateProvider() : zkClientClusterStateProvider();
+  }
+
+  private ClusterStateProvider http2ClusterStateProvider() throws Exception {
+    return new Http2ClusterStateProvider(
+        List.of(cluster.getJettySolrRunner(0).getBaseUrl().toString()), null);
+  }
+
+  private ClusterStateProvider zkClientClusterStateProvider() {
+    return new ZkClientClusterStateProvider(cluster.getZkStateReader());
+  }
+
+  @Test
+  public void testGetClusterState() throws Exception {
+
+    createCollection("testGetClusterState");
+    createCollection("testGetClusterState2");
+
+    try (ClusterStateProvider provider = createClusterStateProvider()) {
+
+      ClusterState clusterState = provider.getClusterState();
+
+      DocCollection docCollection = clusterState.getCollection("testGetClusterState");
+      assertEquals(
+          getCreationTimeFromClusterStatus("testGetClusterState"), docCollection.getCreationTime());
+
+      docCollection = clusterState.getCollection("testGetClusterState2");
+      assertEquals(
+          getCreationTimeFromClusterStatus("testGetClusterState2"),
+          docCollection.getCreationTime());
+    }
+  }
+
+  @Test
+  public void testGetState() throws Exception {
+
+    createCollection("testGetState");
+
+    try (ClusterStateProvider provider = createClusterStateProvider()) {
+
+      ClusterState.CollectionRef collectionRef = provider.getState("testGetState");
+
+      DocCollection docCollection = collectionRef.get();
+      assertNotNull(docCollection);
+      assertEquals(
+          getCreationTimeFromClusterStatus("testGetState"), docCollection.getCreationTime());
+    }
+  }
+
+  private void createCollection(String collectionName) throws SolrServerException, IOException {
+    CollectionAdminRequest.Create request =
+        CollectionAdminRequest.createCollection(collectionName, "conf", 1, 0, 1, 0);
+    request.process(cluster.getSolrClient());
+    cluster.waitForActiveCollection(collectionName, 1, 1);
+  }
+
+  @SuppressWarnings("unchecked")
+  private Instant getCreationTimeFromClusterStatus(String collectionName)
+      throws SolrServerException, IOException {
+    CollectionAdminRequest.ClusterStatus request = CollectionAdminRequest.getClusterStatus();
+    request.setCollectionName(collectionName);
+    CollectionAdminResponse clusterStatusResponse = request.process(cluster.getSolrClient());
+    NamedList<Object> response = clusterStatusResponse.getResponse();
+
+    NamedList<Object> cluster = (NamedList<Object>) response.get("cluster");
+    NamedList<Object> collections = (NamedList<Object>) cluster.get("collections");
+    Map<String, Object> collection = (Map<String, Object>) collections.get(collectionName);
+    return Instant.ofEpochMilli((long) collection.get("creationTimeMillis"));
+  }
+}