You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/10/04 09:16:58 UTC

[bookkeeper] branch master updated: Introduce lifecycle components for managing components in AutoRecovery

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c34166  Introduce lifecycle components for managing components in AutoRecovery
3c34166 is described below

commit 3c3416669d750b70d4f9c0e3ac1c704b77b45023
Author: Charan Reddy Guttapalem <re...@gmail.com>
AuthorDate: Thu Oct 4 02:16:47 2018 -0700

    Introduce lifecycle components for managing components in AutoRecovery
    
    
    Descriptions of the changes in this PR:
    
    
    - lifecycle components for managing components in AutoRecovery
    - expose metrics of AR in the same http admin endpoint
    
    
    Author: cguttapalem <cg...@salesforce.com>
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
    
    This closes #1726 from reddycharan/stackar
---
 bookkeeper-server/pom.xml                          |  12 ++
 .../bookkeeper/replication/AutoRecoveryMain.java   | 154 ++++++++++++++-------
 .../java/org/apache/bookkeeper/server/Main.java    |   2 +-
 .../server/service/AutoRecoveryService.java        |  11 ++
 .../bookie/BookieInitializationTest.java           | 130 +++++++++++++++++
 5 files changed, 261 insertions(+), 48 deletions(-)

diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml
index e154a36..e093d80 100644
--- a/bookkeeper-server/pom.xml
+++ b/bookkeeper-server/pom.xml
@@ -150,6 +150,18 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.bookkeeper.stats</groupId>
+      <artifactId>prometheus-metrics-provider</artifactId>
+      <version>${project.parent.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.bookkeeper.http</groupId>
+      <artifactId>vertx-http-server</artifactId>
+      <version>${project.parent.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
   <build>
     <plugins>
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
index d63e19e..cd4aee2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/AutoRecoveryMain.java
@@ -27,19 +27,26 @@ import com.google.common.annotations.VisibleForTesting;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
 import java.net.MalformedURLException;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.BookieCriticalThread;
 import org.apache.bookkeeper.bookie.ExitCode;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.common.component.ComponentStarter;
+import org.apache.bookkeeper.common.component.LifecycleComponent;
+import org.apache.bookkeeper.common.component.LifecycleComponentStack;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.http.HttpServer;
-import org.apache.bookkeeper.http.HttpServerLoader;
 import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
 import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
+import org.apache.bookkeeper.server.conf.BookieConfiguration;
 import org.apache.bookkeeper.server.http.BKHttpServiceProvider;
+import org.apache.bookkeeper.server.service.AutoRecoveryService;
+import org.apache.bookkeeper.server.service.HttpService;
+import org.apache.bookkeeper.server.service.StatsProviderService;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.commons.cli.BasicParser;
@@ -70,6 +77,9 @@ public class AutoRecoveryMain {
     private volatile boolean shuttingDown = false;
     private volatile boolean running = false;
 
+    // Exception handler
+    private volatile UncaughtExceptionHandler uncaughtExceptionHandler = null;
+
     public AutoRecoveryMain(ServerConfiguration conf) throws IOException,
             InterruptedException, KeeperException, UnavailableException,
             CompatibilityException {
@@ -102,6 +112,9 @@ public class AutoRecoveryMain {
     public void start() throws UnavailableException {
         auditorElector.start();
         replicationWorker.start();
+        if (null != uncaughtExceptionHandler) {
+            deathWatcher.setUncaughtExceptionHandler(uncaughtExceptionHandler);
+        }
         deathWatcher.start();
         running = true;
     }
@@ -129,13 +142,6 @@ public class AutoRecoveryMain {
         shuttingDown = true;
         running = false;
         this.exitCode = exitCode;
-        try {
-            deathWatcher.interrupt();
-            deathWatcher.join();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            LOG.warn("Interrupted shutting down auto recovery", e);
-        }
 
         try {
             auditorElector.shutdown();
@@ -158,6 +164,18 @@ public class AutoRecoveryMain {
         return exitCode;
     }
 
+    /**
+     * Currently the uncaught exception handler is used for DeathWatcher to notify
+     * lifecycle management that a bookie is dead for some reasons.
+     *
+     * <p>in future, we can register this <tt>exceptionHandler</tt> to critical threads
+     * so when those threads are dead, it will automatically trigger lifecycle management
+     * to shutdown the process.
+     */
+    public void setExceptionHandler(UncaughtExceptionHandler exceptionHandler) {
+        this.uncaughtExceptionHandler = exceptionHandler;
+    }
+
     @VisibleForTesting
     public Auditor getAuditor() {
         return auditorElector.getAuditor();
@@ -171,7 +189,7 @@ public class AutoRecoveryMain {
     /*
      * DeathWatcher for AutoRecovery daemons.
      */
-    private static class AutoRecoveryDeathWatcher extends BookieCriticalThread {
+    private class AutoRecoveryDeathWatcher extends BookieCriticalThread {
         private int watchInterval;
         private AutoRecoveryMain autoRecoveryMain;
 
@@ -180,6 +198,13 @@ public class AutoRecoveryMain {
                     + autoRecoveryMain.conf.getBookiePort());
             this.autoRecoveryMain = autoRecoveryMain;
             watchInterval = autoRecoveryMain.conf.getDeathWatchInterval();
+            // set a default uncaught exception handler to shutdown the AutoRecovery
+            // when it notices the AutoRecovery is not running any more.
+            setUncaughtExceptionHandler((thread, cause) -> {
+                LOG.info("AutoRecoveryDeathWatcher exited loop due to uncaught exception from thread {}",
+                    thread.getName(), cause);
+                shutdown();
+            });
         }
 
         @Override
@@ -189,13 +214,20 @@ public class AutoRecoveryMain {
                     Thread.sleep(watchInterval);
                 } catch (InterruptedException ie) {
                     Thread.currentThread().interrupt();
-                    break;
                 }
                 // If any one service not running, then shutdown peer.
-                if (!autoRecoveryMain.auditorElector.isRunning()
-                    || !autoRecoveryMain.replicationWorker.isRunning()) {
-                    autoRecoveryMain.shutdown(ExitCode.SERVER_EXCEPTION);
-                    break;
+                if (!autoRecoveryMain.auditorElector.isRunning() || !autoRecoveryMain.replicationWorker.isRunning()) {
+                    LOG.info(
+                            "AutoRecoveryDeathWatcher noticed the AutoRecovery is not running any more,"
+                            + "exiting the watch loop!");
+                    /*
+                     * death watcher has noticed that AutoRecovery is not
+                     * running any more throw an exception to fail the death
+                     * watcher thread and it will trigger the uncaught exception
+                     * handler to handle this "AutoRecovery not running"
+                     * situation.
+                     */
+                    throw new RuntimeException("AutoRecovery is not running any more");
                 }
             }
         }
@@ -266,45 +298,73 @@ public class AutoRecoveryMain {
     }
 
     public static void main(String[] args) {
-        ServerConfiguration conf = null;
+        int retCode = doMain(args);
+        Runtime.getRuntime().exit(retCode);
+    }
+
+    static int doMain(String[] args) {
+
+        ServerConfiguration conf;
+
+        // 0. parse command line
         try {
             conf = parseArgs(args);
         } catch (IllegalArgumentException iae) {
-            LOG.error("Error parsing command line arguments : ", iae);
-            System.err.println(iae.getMessage());
-            printUsage();
-            System.exit(ExitCode.INVALID_CONF);
+            return ExitCode.INVALID_CONF;
         }
 
+        // 1. building the component stack:
+        LifecycleComponent server;
         try {
-            final AutoRecoveryMain autoRecoveryMain = new AutoRecoveryMain(conf);
-            autoRecoveryMain.start();
-            HttpServerLoader.loadHttpServer(conf);
-            final HttpServer httpServer = HttpServerLoader.get();
-            if (conf.isHttpServerEnabled() && httpServer != null) {
-                BKHttpServiceProvider serviceProvider = new BKHttpServiceProvider.Builder()
-                    .setAutoRecovery(autoRecoveryMain)
-                    .setServerConfiguration(conf)
-                    .build();
-                httpServer.initialize(serviceProvider);
-                httpServer.startServer(conf.getHttpServerPort());
-            }
-            Runtime.getRuntime().addShutdownHook(new Thread() {
-                @Override
-                public void run() {
-                    autoRecoveryMain.shutdown();
-                    if (httpServer != null && httpServer.isRunning()) {
-                        httpServer.stopServer();
-                    }
-                    LOG.info("Shutdown AutoRecoveryMain successfully");
-                }
-            });
-            LOG.info("Register shutdown hook successfully");
-            autoRecoveryMain.join();
-            System.exit(autoRecoveryMain.getExitCode());
+            server = buildAutoRecoveryServer(new BookieConfiguration(conf));
         } catch (Exception e) {
-            LOG.error("Exception running AutoRecoveryMain : ", e);
-            System.exit(ExitCode.SERVER_EXCEPTION);
+            LOG.error("Failed to build AutoRecovery Server", e);
+            return ExitCode.SERVER_EXCEPTION;
+        }
+
+        // 2. start the server
+        try {
+            ComponentStarter.startComponent(server).get();
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            // the server is interrupted
+            LOG.info("AutoRecovery server is interrupted. Exiting ...");
+        } catch (ExecutionException ee) {
+            LOG.error("Error in bookie shutdown", ee.getCause());
+            return ExitCode.SERVER_EXCEPTION;
         }
+        return ExitCode.OK;
+    }
+
+    public static LifecycleComponentStack buildAutoRecoveryServer(BookieConfiguration conf) throws Exception {
+        LifecycleComponentStack.Builder serverBuilder = LifecycleComponentStack.newBuilder()
+                .withName("autorecovery-server");
+
+        // 1. build stats provider
+        StatsProviderService statsProviderService = new StatsProviderService(conf);
+        StatsLogger rootStatsLogger = statsProviderService.getStatsProvider().getStatsLogger("");
+
+        serverBuilder.addComponent(statsProviderService);
+        LOG.info("Load lifecycle component : {}", StatsProviderService.class.getName());
+
+        // 2. build AutoRecovery server
+        AutoRecoveryService autoRecoveryService = new AutoRecoveryService(conf, rootStatsLogger);
+
+        serverBuilder.addComponent(autoRecoveryService);
+        LOG.info("Load lifecycle component : {}", AutoRecoveryService.class.getName());
+
+        // 4. build http service
+        if (conf.getServerConf().isHttpServerEnabled()) {
+            BKHttpServiceProvider provider = new BKHttpServiceProvider.Builder()
+                    .setAutoRecovery(autoRecoveryService.getAutoRecoveryServer())
+                    .setServerConfiguration(conf.getServerConf())
+                    .setStatsProvider(statsProviderService.getStatsProvider()).build();
+            HttpService httpService = new HttpService(provider, conf, rootStatsLogger);
+
+            serverBuilder.addComponent(httpService);
+            LOG.info("Load lifecycle component : {}", HttpService.class.getName());
+        }
+
+        return serverBuilder.build();
     }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java
index ae92955..b991d31 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/Main.java
@@ -284,7 +284,7 @@ public class Main {
      * @param conf bookie server configuration
      * @return lifecycle stack
      */
-    static LifecycleComponentStack buildBookieServer(BookieConfiguration conf) throws Exception {
+    public static LifecycleComponentStack buildBookieServer(BookieConfiguration conf) throws Exception {
         LifecycleComponentStack.Builder serverBuilder = LifecycleComponentStack.newBuilder().withName("bookie-server");
 
         // 1. build stats provider
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/AutoRecoveryService.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/AutoRecoveryService.java
index b2b8f07..f8389df 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/AutoRecoveryService.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/server/service/AutoRecoveryService.java
@@ -19,6 +19,8 @@
 package org.apache.bookkeeper.server.service;
 
 import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
+
 import org.apache.bookkeeper.replication.AutoRecoveryMain;
 import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
 import org.apache.bookkeeper.server.component.ServerLifecycleComponent;
@@ -42,6 +44,15 @@ public class AutoRecoveryService extends ServerLifecycleComponent {
     }
 
     @Override
+    public void setExceptionHandler(UncaughtExceptionHandler handler) {
+        main.setExceptionHandler(handler);
+    }
+
+    public AutoRecoveryMain getAutoRecoveryServer() {
+        return main;
+    }
+
+    @Override
     protected void doStart() {
         try {
             this.main.start();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
index 48ea817..6500e0b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
@@ -33,16 +33,23 @@ import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
 import java.net.BindException;
 import java.net.InetAddress;
+import java.net.URL;
+import java.net.URLConnection;
 import java.security.AccessControlException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 
 import java.util.concurrent.CompletableFuture;
@@ -55,20 +62,29 @@ import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.BookKeeperAdmin;
 import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.common.component.ComponentStarter;
+import org.apache.bookkeeper.common.component.Lifecycle;
+import org.apache.bookkeeper.common.component.LifecycleComponent;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.discover.RegistrationManager;
+import org.apache.bookkeeper.http.HttpRouter;
+import org.apache.bookkeeper.http.HttpServerLoader;
 import org.apache.bookkeeper.meta.MetadataBookieDriver;
 import org.apache.bookkeeper.meta.zk.ZKMetadataBookieDriver;
 import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
 import org.apache.bookkeeper.proto.BookieServer;
+import org.apache.bookkeeper.replication.AutoRecoveryMain;
 import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
 import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
+import org.apache.bookkeeper.replication.ReplicationStats;
+import org.apache.bookkeeper.server.Main;
 import org.apache.bookkeeper.server.conf.BookieConfiguration;
+import org.apache.bookkeeper.server.service.AutoRecoveryService;
 import org.apache.bookkeeper.server.service.BookieService;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.prometheus.PrometheusMetricsProvider;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.test.PortManager;
 import org.apache.bookkeeper.tls.SecurityException;
@@ -91,6 +107,8 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
     private static final Logger LOG = LoggerFactory
             .getLogger(BookieInitializationTest.class);
 
+    private static ObjectMapper om = new ObjectMapper();
+
     @Rule
     public final TestName runtime = new TestName();
     ZKMetadataBookieDriver driver;
@@ -495,6 +513,22 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
         startFuture.get();
     }
 
+    @Test
+    public void testAutoRecoveryServiceExceptionHandler() throws Exception {
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setMetadataServiceUri(metadataServiceUri);
+
+        BookieConfiguration bkConf = new BookieConfiguration(conf);
+        AutoRecoveryService service = new AutoRecoveryService(bkConf, NullStatsLogger.INSTANCE);
+        CompletableFuture<Void> startFuture = ComponentStarter.startComponent(service);
+
+        // shutdown the AutoRecovery service
+        service.getAutoRecoveryServer().shutdown();
+
+        // the AutoRecovery lifecycle component should be shutdown.
+        startFuture.get();
+    }
+
     /**
      * Verify bookie server starts up on ephemeral ports.
      */
@@ -1072,4 +1106,100 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
         }
     }
 
+    @Test
+    public void testIOVertexHTTPServerEndpointForBookieWithPrometheusProvider() throws Exception {
+        File tmpDir = createTempDir("bookie", "test");
+
+        final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
+                .setJournalDirName(tmpDir.getPath()).setLedgerDirNames(new String[] { tmpDir.getPath() })
+                .setBookiePort(PortManager.nextFreePort()).setMetadataServiceUri(metadataServiceUri)
+                .setListeningInterface(null);
+
+        /*
+         * enable io.vertx http server
+         */
+        int nextFreePort = PortManager.nextFreePort();
+        conf.setStatsProviderClass(PrometheusMetricsProvider.class);
+        conf.setHttpServerEnabled(true);
+        conf.setProperty(HttpServerLoader.HTTP_SERVER_CLASS, "org.apache.bookkeeper.http.vertx.VertxHttpServer");
+        conf.setHttpServerPort(nextFreePort);
+
+        // 1. building the component stack:
+        LifecycleComponent server = Main.buildBookieServer(new BookieConfiguration(conf));
+        // 2. start the server
+        CompletableFuture<Void> stackComponentFuture = ComponentStarter.startComponent(server);
+        while (server.lifecycleState() != Lifecycle.State.STARTED) {
+            Thread.sleep(100);
+        }
+
+        // Now, hit the rest endpoint for metrics
+        URL url = new URL("http://localhost:" + nextFreePort + HttpRouter.METRICS);
+        URLConnection urlc = url.openConnection();
+        BufferedReader in = new BufferedReader(new InputStreamReader(urlc.getInputStream()));
+        String inputLine;
+        StringBuilder metricsStringBuilder = new StringBuilder();
+        while ((inputLine = in.readLine()) != null) {
+            metricsStringBuilder.append(inputLine);
+        }
+        in.close();
+        String metrics = metricsStringBuilder.toString();
+        // do primitive checks if metrics string contains some stats
+        assertTrue("Metrics should contain basic counters", metrics.contains(BookKeeperServerStats.BOOKIE_ADD_ENTRY));
+
+        // Now, hit the rest endpoint for configs
+        url = new URL("http://localhost:" + nextFreePort + HttpRouter.SERVER_CONFIG);
+        @SuppressWarnings("unchecked")
+        Map<String, Object> configMap = om.readValue(url, Map.class);
+        if (configMap.isEmpty() || !configMap.containsKey("bookiePort")) {
+            Assert.fail("Failed to map configurations to valid JSON entries.");
+        }
+        stackComponentFuture.cancel(true);
+    }
+
+    @Test
+    public void testIOVertexHTTPServerEndpointForARWithPrometheusProvider() throws Exception {
+        final ServerConfiguration conf = TestBKConfiguration.newServerConfiguration()
+                .setMetadataServiceUri(metadataServiceUri).setListeningInterface(null);
+
+        /*
+         * enable io.vertx http server
+         */
+        int nextFreePort = PortManager.nextFreePort();
+        conf.setStatsProviderClass(PrometheusMetricsProvider.class);
+        conf.setHttpServerEnabled(true);
+        conf.setProperty(HttpServerLoader.HTTP_SERVER_CLASS, "org.apache.bookkeeper.http.vertx.VertxHttpServer");
+        conf.setHttpServerPort(nextFreePort);
+
+        // 1. building the component stack:
+        LifecycleComponent server = AutoRecoveryMain.buildAutoRecoveryServer(new BookieConfiguration(conf));
+        // 2. start the server
+        CompletableFuture<Void> stackComponentFuture = ComponentStarter.startComponent(server);
+        while (server.lifecycleState() != Lifecycle.State.STARTED) {
+            Thread.sleep(100);
+        }
+
+        // Now, hit the rest endpoint for metrics
+        URL url = new URL("http://localhost:" + nextFreePort + HttpRouter.METRICS);
+        URLConnection urlc = url.openConnection();
+        BufferedReader in = new BufferedReader(new InputStreamReader(urlc.getInputStream()));
+        String inputLine;
+        StringBuilder metricsStringBuilder = new StringBuilder();
+        while ((inputLine = in.readLine()) != null) {
+            metricsStringBuilder.append(inputLine);
+        }
+        in.close();
+        String metrics = metricsStringBuilder.toString();
+        // do primitive checks if metrics string contains some stats
+        assertTrue("Metrics should contain basic counters",
+                metrics.contains(ReplicationStats.NUM_UNDER_REPLICATED_LEDGERS));
+
+        // Now, hit the rest endpoint for configs
+        url = new URL("http://localhost:" + nextFreePort + HttpRouter.SERVER_CONFIG);
+        @SuppressWarnings("unchecked")
+        Map<String, Object> configMap = om.readValue(url, Map.class);
+        if (configMap.isEmpty() || !configMap.containsKey("metadataServiceUri")) {
+            Assert.fail("Failed to map configurations to valid JSON entries.");
+        }
+        stackComponentFuture.cancel(true);
+    }
 }