You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2017/05/11 17:32:14 UTC

hive git commit: HIVE-16637: Improve end-of-data checking for LLAP input format (Jason Dere, reviewed by Sergey Shelukhin)

Repository: hive
Updated Branches:
  refs/heads/master 8e0087942 -> 162f59242


HIVE-16637: Improve end-of-data checking for LLAP input format (Jason Dere, reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/162f5924
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/162f5924
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/162f5924

Branch: refs/heads/master
Commit: 162f592425ce2242ed9be7cd805aa1d5e37b3c11
Parents: 8e00879
Author: Jason Dere <jd...@hortonworks.com>
Authored: Thu May 11 10:31:21 2017 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Thu May 11 10:31:21 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hive/llap/LlapBaseRecordReader.java  |  48 +++-
 .../hadoop/hive/llap/io/ChunkedInputStream.java |  97 +++++++
 .../hive/llap/io/ChunkedOutputStream.java       | 103 ++++++++
 .../hive/llap/io/TestChunkedInputStream.java    | 254 +++++++++++++++++++
 .../hive/llap/LlapOutputFormatService.java      |   5 +-
 5 files changed, 493 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/162f5924/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
index 59dec1b..7fff147 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/LlapBaseRecordReader.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.llap;
 
+import java.io.BufferedInputStream;
 import java.io.Closeable;
 import java.io.EOFException;
 import java.io.IOException;
@@ -29,6 +30,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.Schema;
+import org.apache.hadoop.hive.llap.io.ChunkedInputStream;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.NullWritable;
@@ -46,6 +48,7 @@ import org.slf4j.LoggerFactory;
 public class LlapBaseRecordReader<V extends WritableComparable> implements RecordReader<NullWritable, V> {
   private static final Logger LOG = LoggerFactory.getLogger(LlapBaseRecordReader.class);
 
+  protected final ChunkedInputStream cin;
   protected final DataInputStream din;
   protected final Schema schema;
   protected final Class<V> clazz;
@@ -58,7 +61,9 @@ public class LlapBaseRecordReader<V extends WritableComparable> implements Recor
 
   public LlapBaseRecordReader(InputStream in, Schema schema,
       Class<V> clazz, JobConf job, Closeable client, Closeable socket) {
-    din = new DataInputStream(in);
+    this.cin = new ChunkedInputStream(in);  // Save so we can verify end of stream
+    // We need mark support - wrap with BufferedInputStream.
+    din = new DataInputStream(new BufferedInputStream(cin));
     this.schema = schema;
     this.clazz = clazz;
     this.readerThread = Thread.currentThread();
@@ -129,19 +134,27 @@ public class LlapBaseRecordReader<V extends WritableComparable> implements Recor
       // Need a way to know what thread to interrupt, since this is a blocking thread.
       setReaderThread(Thread.currentThread());
 
-      value.readFields(din);
-      return true;
-    } catch (EOFException eof) {
-      // End of input. There should be a reader event available, or coming soon, so okay to be blocking call.
-      ReaderEvent event = getReaderEvent();
-      switch (event.getEventType()) {
-        case DONE:
-          break;
-        default:
-          throw new IOException("Expected reader event with done status, but got "
-              + event.getEventType() + " with message " + event.getMessage());
+      if (hasInput()) {
+        value.readFields(din);
+        return true;
+      } else {
+        // End of input. Confirm we got end of stream indicator from server,
+        // as well as DONE status from fragment execution.
+        if (!cin.isEndOfData()) {
+          throw new IOException("Hit end of input, but did not find expected end of data indicator");
+        }
+
+        // There should be a reader event available, or coming soon, so okay to be blocking call.
+        ReaderEvent event = getReaderEvent();
+        switch (event.getEventType()) {
+          case DONE:
+            break;
+          default:
+            throw new IOException("Expected reader event with done status, but got "
+                + event.getEventType() + " with message " + event.getMessage());
+        }
+        return false;
       }
-      return false;
     } catch (IOException io) {
       if (Thread.interrupted()) {
         // Either we were interrupted by one of:
@@ -232,6 +245,15 @@ public class LlapBaseRecordReader<V extends WritableComparable> implements Recor
     }
   }
 
+  protected boolean hasInput() throws IOException {
+    din.mark(1);
+    if (din.read() >= 0) {
+      din.reset();
+      return true;
+    }
+    return false;
+  }
+
   protected ReaderEvent getReaderEvent() throws IOException {
     try {
       ReaderEvent event = readerEvents.poll(timeout, TimeUnit.MILLISECONDS);

http://git-wip-us.apache.org/repos/asf/hive/blob/162f5924/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedInputStream.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedInputStream.java b/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedInputStream.java
new file mode 100644
index 0000000..4e1c65f
--- /dev/null
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedInputStream.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.io;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// Data is expected to be a series of data chunks in the form <chunk size><chunk bytes><chunk size><chunk bytes>
+// The final data chunk should be a 0-length chunk which will indicate end of input.
+public class ChunkedInputStream extends InputStream {
+
+  static final private Logger LOG = LoggerFactory.getLogger(ChunkedInputStream.class);
+
+  private DataInputStream din;
+  private int unreadBytes = 0;  // Bytes remaining in the current chunk of data
+  private byte[] singleByte = new byte[1];
+  private boolean endOfData = false;
+
+  public ChunkedInputStream(InputStream in) {
+    din = new DataInputStream(in);
+  }
+
+  @Override
+  public void close() throws IOException {
+    din.close();
+  }
+
+  @Override
+  public int read() throws IOException {
+    int bytesRead = read(singleByte, 0, 1);
+    return (bytesRead == -1) ? -1 : (int) singleByte[0];
+  }
+
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+    int bytesRead = 0;
+
+    if (len < 0) {
+      throw new IllegalArgumentException("Negative read length");
+    } else if (len == 0) {
+      return 0;
+    }
+
+    // If there is a current unread chunk, read from that, or else get the next chunk.
+    if (unreadBytes == 0) {
+      try {
+        // Find the next chunk size
+        unreadBytes = din.readInt();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Chunk size " + unreadBytes);
+        }
+        if (unreadBytes == 0) {
+          LOG.debug("Hit end of data");
+          endOfData = true;
+          return -1;
+        }
+      } catch (IOException err) {
+        throw new IOException("Error while attempting to read chunk length", err);
+      }
+    }
+
+    int bytesToRead = Math.min(len, unreadBytes);
+    try {
+      din.readFully(b, off, bytesToRead);
+    } catch (IOException err) {
+      throw new IOException("Error while attempting to read " + bytesToRead + " bytes from current chunk", err);
+    }
+    unreadBytes -= bytesToRead;
+    bytesRead += bytesToRead;
+
+    return bytesRead;
+  }
+
+  public boolean isEndOfData() {
+    return endOfData;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/162f5924/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedOutputStream.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedOutputStream.java b/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedOutputStream.java
new file mode 100644
index 0000000..31815d5
--- /dev/null
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/io/ChunkedOutputStream.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.io;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// Writes data out as a series of chunks in the form <chunk size><chunk bytes><chunk size><chunk bytes>
+// Closing the output stream will send a final 0-length chunk which will indicate end of input.
+public class ChunkedOutputStream extends OutputStream {
+
+  static final private Logger LOG = LoggerFactory.getLogger(ChunkedOutputStream.class);
+
+  private DataOutputStream dout;
+  private byte[] singleByte = new byte[1];
+  private byte[] buffer;
+  private int bufPos = 0;
+
+  public ChunkedOutputStream(OutputStream out, int bufSize) {
+    if (bufSize <= 0) {
+      throw new IllegalArgumentException("Positive bufSize required, was " + bufSize);
+    }
+    buffer = new byte[bufSize];
+    dout = new DataOutputStream(out);
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    singleByte[0] = (byte) b;
+    write(singleByte, 0, 1);
+  }
+
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException {
+    int bytesWritten = 0;
+    while (bytesWritten < len) {
+      // Copy the data to the buffer
+      int bytesToWrite = Math.min(len - bytesWritten, buffer.length - bufPos);
+      System.arraycopy(b, off + bytesWritten, buffer, bufPos, bytesToWrite);
+      bytesWritten += bytesToWrite;
+      bufPos += bytesToWrite;
+
+      // If we've filled the buffer, write it out
+      if (bufPos == buffer.length) {
+        writeChunk();
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    flush();
+
+    // Write final 0-length chunk
+    writeChunk();
+
+    LOG.debug("ChunkedOutputStream: Closing underlying output stream.");
+    dout.close();
+  }
+
+  @Override
+  public void flush() throws IOException {
+    // Write any remaining bytes to the out stream.
+    if (bufPos > 0) {
+      writeChunk();
+      dout.flush();
+    }
+  }
+
+  private void writeChunk() throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Writing chunk of size " + bufPos);
+    }
+
+    // First write chunk length
+    dout.writeInt(bufPos);
+
+    // Then write chunk bytes
+    dout.write(buffer, 0, bufPos);
+
+    bufPos = 0; // reset buffer
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/162f5924/llap-common/src/test/org/apache/hadoop/hive/llap/io/TestChunkedInputStream.java
----------------------------------------------------------------------
diff --git a/llap-common/src/test/org/apache/hadoop/hive/llap/io/TestChunkedInputStream.java b/llap-common/src/test/org/apache/hadoop/hive/llap/io/TestChunkedInputStream.java
new file mode 100644
index 0000000..47209c2
--- /dev/null
+++ b/llap-common/src/test/org/apache/hadoop/hive/llap/io/TestChunkedInputStream.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.io;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.FilterInputStream;
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.hive.common.type.RandomTypeUtil;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestChunkedInputStream {
+
+  static int bufferSize = 128;
+  static Random rand = new Random();
+  static String alphabet = "abcdefghijklmnopqrstuvwxyz";
+
+  static class StreamTester {
+    Exception error = null;
+
+    public Exception getError() {
+      return error;
+    }
+
+    public void setError(Exception error) {
+      this.error = error;
+    }
+  }
+
+  // Test class to write a series of values to the designated output stream
+  static class BasicUsageWriter extends StreamTester implements Runnable {
+    TestStreams streams;
+    boolean flushCout;
+    boolean closePoutEarly;
+
+    public BasicUsageWriter(TestStreams streams, boolean flushCout, boolean closePoutEarly) {
+      this.streams = streams;
+      this.flushCout = flushCout;
+      this.closePoutEarly = closePoutEarly;
+    }
+
+    @Override
+    public void run() {
+      try {
+        // Write the items to the output stream.
+        for (byte[] value: streams.values) {
+          streams.out.write(value, 0, value.length);
+        }
+
+        if (flushCout) {
+          streams.out.flush();
+        }
+        if (closePoutEarly) {
+          // Close the inner output stream before closing the outer output stream.
+          // For chunked output this means we don't write end-of-data indicator.
+          streams.pout.close();
+        }
+        // This will throw error if we close pout early.
+        streams.out.close();
+      } catch (Exception err) {
+        err.printStackTrace();
+        this.error = err;
+      }
+    }
+  }
+
+  // Test class to read a series of values to the designated input stream
+  static class BasicUsageReader extends StreamTester implements Runnable {
+    TestStreams streams;
+    boolean allValuesRead = false;
+
+    public BasicUsageReader(TestStreams streams) {
+      this.streams = streams;
+    }
+
+    // Continue reading from the input stream until the desired number of byte has been read
+    void readFully(InputStream in, byte[] readValue, int numBytes) throws IOException {
+      int bytesRead = 0;
+      while (bytesRead < numBytes) {
+        int read = in.read(readValue, bytesRead, numBytes - bytesRead);
+        if (read <= 0) {
+          throw new IOException("Unexpected read length " + read);
+        }
+        bytesRead += read;
+      }
+    }
+
+    @Override
+    public void run() {
+      try {
+        // Read the items from the input stream and confirm they match
+        for (byte[] value : streams.values) {
+          byte[] readValue = new byte[value.length];
+          readFully(streams.in, readValue, readValue.length);
+          assertArrayEquals(value, readValue);
+        }
+
+        allValuesRead = true;
+
+        // Check that the output is done
+        assertEquals(-1, streams.in.read());
+      } catch (Exception err) {
+        err.printStackTrace();
+        this.error = err;
+      }
+    }
+  }
+
+  static class MyFilterInputStream extends FilterInputStream {
+    public MyFilterInputStream(InputStream in) {
+      super(in);
+    }
+  }
+
+  // Helper class to set up a ChunkedInput/Output stream for testing
+  static class TestStreams {
+    PipedOutputStream pout;
+    OutputStream out;
+    PipedInputStream pin;
+    InputStream in;
+    List<byte[]> values;
+
+    public TestStreams(boolean useChunkedStream) throws Exception {
+      pout = new PipedOutputStream();
+      pin = new PipedInputStream(pout);
+      if (useChunkedStream) {
+        out = new ChunkedOutputStream(pout, bufferSize);
+        in = new ChunkedInputStream(pin);
+      } else {
+        // Test behavior with non-chunked streams
+        out = new FilterOutputStream(pout);
+        in = new MyFilterInputStream(pin);
+      }
+    }
+
+    public void close() {
+      try {
+        pout.close();
+      } catch (Exception err) {
+        // ignore
+      }
+
+      try {
+        pin.close();
+      } catch (Exception err) {
+        // ignore
+      }
+    }
+  }
+
+  static void runTest(Runnable writer, Runnable reader, TestStreams streams) throws Exception {
+    Thread writerThread = new Thread(writer);
+    Thread readerThread = new Thread(reader);
+
+    writerThread.start();
+    readerThread.start();
+
+    writerThread.join();
+    readerThread.join();
+  }
+
+  @Test
+  public void testBasicUsage() throws Exception {
+    List<byte[]> values = Arrays.asList(
+        new byte[]{(byte) 1},
+        new byte[]{(byte) 2},
+        RandomTypeUtil.getRandString(rand, alphabet, 99).getBytes(),
+        RandomTypeUtil.getRandString(rand, alphabet, 1024).getBytes()
+    );
+
+    // Try the basic test with non-chunked stream
+    TestStreams nonChunkedStreams = new TestStreams(false);
+    nonChunkedStreams.values = values;
+    BasicUsageWriter writer1 = new BasicUsageWriter(nonChunkedStreams, false, false);
+    BasicUsageReader reader1 = new BasicUsageReader(nonChunkedStreams);
+    runTest(writer1, reader1, nonChunkedStreams);
+    assertTrue(reader1.allValuesRead);
+    assertNull(writer1.getError());
+    assertNull(reader1.getError());
+
+    // Try with chunked streams
+    TestStreams chunkedStreams = new TestStreams(true);
+    chunkedStreams.values = values;
+    BasicUsageWriter writer2 = new BasicUsageWriter(chunkedStreams, false, false);
+    BasicUsageReader reader2 = new BasicUsageReader(chunkedStreams);
+    runTest(writer2, reader2, nonChunkedStreams);
+    assertTrue(reader2.allValuesRead);
+    assertTrue(((ChunkedInputStream) chunkedStreams.in).isEndOfData());
+    assertNull(writer2.getError());
+    assertNull(reader2.getError());
+  }
+
+  @Test
+  public void testAbruptlyClosedOutput() throws Exception {
+    List<byte[]> values = Arrays.asList(
+        new byte[]{(byte) 1},
+        new byte[]{(byte) 2},
+        RandomTypeUtil.getRandString(rand, alphabet, 99).getBytes(),
+        RandomTypeUtil.getRandString(rand, alphabet, 1024).getBytes()
+    );
+
+    // Close the PipedOutputStream before we close the outermost OutputStream.
+
+    // Try non-chunked stream. There should be no issues assuming we flushed the streams before closing.
+    TestStreams nonChunkedStreams = new TestStreams(false);
+    nonChunkedStreams.values = values;
+    BasicUsageWriter writer1 = new BasicUsageWriter(nonChunkedStreams, true, true);
+    BasicUsageReader reader1 = new BasicUsageReader(nonChunkedStreams);
+    runTest(writer1, reader1, nonChunkedStreams);
+    assertTrue(reader1.allValuesRead);
+    assertNull(writer1.getError());
+    assertNull(reader1.getError());
+
+    // Try with chunked stream. Here the chunked output didn't get a chance to write the end-of-data
+    // indicator, so the chunked input does not know to stop reading.
+    TestStreams chunkedStreams = new TestStreams(true);
+    chunkedStreams.values = values;
+    BasicUsageWriter writer2 = new BasicUsageWriter(chunkedStreams, true, true);
+    BasicUsageReader reader2 = new BasicUsageReader(chunkedStreams);
+    runTest(writer2, reader2, chunkedStreams);
+    assertTrue(reader2.allValuesRead);
+    assertFalse(((ChunkedInputStream) chunkedStreams.in).isEndOfData());
+    // Closing the chunked output stream early gives an error
+    assertNotNull(writer2.getError());
+    // In this case we should expect the test to have failed at the very last read() check.
+    assertNotNull(reader2.getError());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/162f5924/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
index 0619b79..c732ba1 100644
--- a/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
+++ b/ql/src/java/org/apache/hadoop/hive/llap/LlapOutputFormatService.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.LlapOutputSocketInitMessage;
+import org.apache.hadoop.hive.llap.io.ChunkedOutputStream;
 import org.apache.hadoop.hive.llap.security.SecretManager;
 
 import com.google.common.base.Preconditions;
@@ -198,7 +199,9 @@ public class LlapOutputFormatService {
           HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_MAX_PENDING_WRITES);
       @SuppressWarnings("rawtypes")
       LlapRecordWriter writer = new LlapRecordWriter(
-          new ChannelOutputStream(ctx, id, sendBufferSize, maxPendingWrites));
+          new ChunkedOutputStream(
+              new ChannelOutputStream(ctx, id, sendBufferSize, maxPendingWrites),
+              sendBufferSize));
       boolean isFailed = true;
       synchronized (lock) {
         if (!writers.containsKey(id)) {