You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/01/27 21:58:22 UTC
[1/3] hadoop git commit: HDFS-3689. Add support for variable length
block. Contributed by Jing Zhao.
Repository: hadoop
Updated Branches:
refs/heads/trunk 1e2d98a39 -> 2848db814
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored.xml b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored.xml
index 6e8078b..2a1b549 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored.xml
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<EDITS>
- <EDITS_VERSION>-61</EDITS_VERSION>
+ <EDITS_VERSION>-62</EDITS_VERSION>
<RECORD>
<OPCODE>OP_START_LOG_SEGMENT</OPCODE>
<DATA>
@@ -13,8 +13,8 @@
<TXID>2</TXID>
<DELEGATION_KEY>
<KEY_ID>1</KEY_ID>
- <EXPIRY_DATE>1421826999207</EXPIRY_DATE>
- <KEY>ca9a0c8b240570b3</KEY>
+ <EXPIRY_DATE>1422569009939</EXPIRY_DATE>
+ <KEY>907cb34000041937</KEY>
</DELEGATION_KEY>
</DATA>
</RECORD>
@@ -24,8 +24,8 @@
<TXID>3</TXID>
<DELEGATION_KEY>
<KEY_ID>2</KEY_ID>
- <EXPIRY_DATE>1421826999210</EXPIRY_DATE>
- <KEY>833c25a6fb2b0a6f</KEY>
+ <EXPIRY_DATE>1422569009941</EXPIRY_DATE>
+ <KEY>178fa1bd83474b43</KEY>
</DELEGATION_KEY>
</DATA>
</RECORD>
@@ -37,19 +37,19 @@
<INODEID>16386</INODEID>
<PATH>/file_create</PATH>
<REPLICATION>1</REPLICATION>
- <MTIME>1421135800328</MTIME>
- <ATIME>1421135800328</ATIME>
+ <MTIME>1421877810832</MTIME>
+ <ATIME>1421877810832</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
- <CLIENT_NAME>DFSClient_NONMAPREDUCE_240777107_1</CLIENT_NAME>
+ <CLIENT_NAME>DFSClient_NONMAPREDUCE_-986598042_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<OVERWRITE>true</OVERWRITE>
<PERMISSION_STATUS>
- <USERNAME>shv</USERNAME>
+ <USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
- <RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
- <RPC_CALLID>9</RPC_CALLID>
+ <RPC_CLIENTID>1730855b-1f27-4f17-9f72-b9f92eb3a8bd</RPC_CLIENTID>
+ <RPC_CALLID>6</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
@@ -60,60 +60,93 @@
<INODEID>0</INODEID>
<PATH>/file_create</PATH>
<REPLICATION>1</REPLICATION>
- <MTIME>1421135800357</MTIME>
- <ATIME>1421135800328</ATIME>
+ <MTIME>1421877810888</MTIME>
+ <ATIME>1421877810832</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
<OVERWRITE>false</OVERWRITE>
<PERMISSION_STATUS>
- <USERNAME>shv</USERNAME>
+ <USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
</DATA>
</RECORD>
<RECORD>
- <OPCODE>OP_SET_STORAGE_POLICY</OPCODE>
+ <OPCODE>OP_APPEND</OPCODE>
<DATA>
<TXID>6</TXID>
<PATH>/file_create</PATH>
+ <CLIENT_NAME>DFSClient_NONMAPREDUCE_-986598042_1</CLIENT_NAME>
+ <CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
+ <NEWBLOCK>false</NEWBLOCK>
+ <RPC_CLIENTID>1730855b-1f27-4f17-9f72-b9f92eb3a8bd</RPC_CLIENTID>
+ <RPC_CALLID>8</RPC_CALLID>
+ </DATA>
+ </RECORD>
+ <RECORD>
+ <OPCODE>OP_CLOSE</OPCODE>
+ <DATA>
+ <TXID>7</TXID>
+ <LENGTH>0</LENGTH>
+ <INODEID>0</INODEID>
+ <PATH>/file_create</PATH>
+ <REPLICATION>1</REPLICATION>
+ <MTIME>1421877810899</MTIME>
+ <ATIME>1421877810832</ATIME>
+ <BLOCKSIZE>512</BLOCKSIZE>
+ <CLIENT_NAME></CLIENT_NAME>
+ <CLIENT_MACHINE></CLIENT_MACHINE>
+ <OVERWRITE>false</OVERWRITE>
+ <PERMISSION_STATUS>
+ <USERNAME>jing</USERNAME>
+ <GROUPNAME>supergroup</GROUPNAME>
+ <MODE>420</MODE>
+ </PERMISSION_STATUS>
+ </DATA>
+ </RECORD>
+ <RECORD>
+ <OPCODE>OP_SET_STORAGE_POLICY</OPCODE>
+ <DATA>
+ <TXID>8</TXID>
+ <PATH>/file_create</PATH>
<POLICYID>7</POLICYID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_RENAME_OLD</OPCODE>
<DATA>
- <TXID>7</TXID>
+ <TXID>9</TXID>
<LENGTH>0</LENGTH>
<SRC>/file_create</SRC>
<DST>/file_moved</DST>
- <TIMESTAMP>1421135800368</TIMESTAMP>
- <RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
- <RPC_CALLID>12</RPC_CALLID>
+ <TIMESTAMP>1421877810907</TIMESTAMP>
+ <RPC_CLIENTID>1730855b-1f27-4f17-9f72-b9f92eb3a8bd</RPC_CLIENTID>
+ <RPC_CALLID>11</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_DELETE</OPCODE>
<DATA>
- <TXID>8</TXID>
+ <TXID>10</TXID>
<LENGTH>0</LENGTH>
<PATH>/file_moved</PATH>
- <TIMESTAMP>1421135800377</TIMESTAMP>
- <RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
- <RPC_CALLID>13</RPC_CALLID>
+ <TIMESTAMP>1421877810915</TIMESTAMP>
+ <RPC_CLIENTID>1730855b-1f27-4f17-9f72-b9f92eb3a8bd</RPC_CLIENTID>
+ <RPC_CALLID>12</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_MKDIR</OPCODE>
<DATA>
- <TXID>9</TXID>
+ <TXID>11</TXID>
<LENGTH>0</LENGTH>
<INODEID>16387</INODEID>
<PATH>/directory_mkdir</PATH>
- <TIMESTAMP>1421135800394</TIMESTAMP>
+ <TIMESTAMP>1421877810923</TIMESTAMP>
<PERMISSION_STATUS>
- <USERNAME>shv</USERNAME>
+ <USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>493</MODE>
</PERMISSION_STATUS>
@@ -122,94 +155,94 @@
<RECORD>
<OPCODE>OP_ALLOW_SNAPSHOT</OPCODE>
<DATA>
- <TXID>10</TXID>
+ <TXID>12</TXID>
<SNAPSHOTROOT>/directory_mkdir</SNAPSHOTROOT>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_DISALLOW_SNAPSHOT</OPCODE>
<DATA>
- <TXID>11</TXID>
+ <TXID>13</TXID>
<SNAPSHOTROOT>/directory_mkdir</SNAPSHOTROOT>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ALLOW_SNAPSHOT</OPCODE>
<DATA>
- <TXID>12</TXID>
+ <TXID>14</TXID>
<SNAPSHOTROOT>/directory_mkdir</SNAPSHOTROOT>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_CREATE_SNAPSHOT</OPCODE>
<DATA>
- <TXID>13</TXID>
+ <TXID>15</TXID>
<SNAPSHOTROOT>/directory_mkdir</SNAPSHOTROOT>
<SNAPSHOTNAME>snapshot1</SNAPSHOTNAME>
- <RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
- <RPC_CALLID>18</RPC_CALLID>
+ <RPC_CLIENTID>1730855b-1f27-4f17-9f72-b9f92eb3a8bd</RPC_CLIENTID>
+ <RPC_CALLID>17</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_RENAME_SNAPSHOT</OPCODE>
<DATA>
- <TXID>14</TXID>
+ <TXID>16</TXID>
<SNAPSHOTROOT>/directory_mkdir</SNAPSHOTROOT>
<SNAPSHOTOLDNAME>snapshot1</SNAPSHOTOLDNAME>
<SNAPSHOTNEWNAME>snapshot2</SNAPSHOTNEWNAME>
- <RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
- <RPC_CALLID>19</RPC_CALLID>
+ <RPC_CLIENTID>1730855b-1f27-4f17-9f72-b9f92eb3a8bd</RPC_CLIENTID>
+ <RPC_CALLID>18</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_DELETE_SNAPSHOT</OPCODE>
<DATA>
- <TXID>15</TXID>
+ <TXID>17</TXID>
<SNAPSHOTROOT>/directory_mkdir</SNAPSHOTROOT>
<SNAPSHOTNAME>snapshot2</SNAPSHOTNAME>
- <RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
- <RPC_CALLID>20</RPC_CALLID>
+ <RPC_CLIENTID>1730855b-1f27-4f17-9f72-b9f92eb3a8bd</RPC_CLIENTID>
+ <RPC_CALLID>19</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ADD</OPCODE>
<DATA>
- <TXID>16</TXID>
+ <TXID>18</TXID>
<LENGTH>0</LENGTH>
<INODEID>16388</INODEID>
<PATH>/file_create</PATH>
<REPLICATION>1</REPLICATION>
- <MTIME>1421135800442</MTIME>
- <ATIME>1421135800442</ATIME>
+ <MTIME>1421877810946</MTIME>
+ <ATIME>1421877810946</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
- <CLIENT_NAME>DFSClient_NONMAPREDUCE_240777107_1</CLIENT_NAME>
+ <CLIENT_NAME>DFSClient_NONMAPREDUCE_-986598042_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<OVERWRITE>true</OVERWRITE>
<PERMISSION_STATUS>
- <USERNAME>shv</USERNAME>
+ <USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
- <RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
- <RPC_CALLID>21</RPC_CALLID>
+ <RPC_CLIENTID>1730855b-1f27-4f17-9f72-b9f92eb3a8bd</RPC_CLIENTID>
+ <RPC_CALLID>20</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_CLOSE</OPCODE>
<DATA>
- <TXID>17</TXID>
+ <TXID>19</TXID>
<LENGTH>0</LENGTH>
<INODEID>0</INODEID>
<PATH>/file_create</PATH>
<REPLICATION>1</REPLICATION>
- <MTIME>1421135800445</MTIME>
- <ATIME>1421135800442</ATIME>
+ <MTIME>1421877810948</MTIME>
+ <ATIME>1421877810946</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
<OVERWRITE>false</OVERWRITE>
<PERMISSION_STATUS>
- <USERNAME>shv</USERNAME>
+ <USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@@ -218,7 +251,7 @@
<RECORD>
<OPCODE>OP_SET_REPLICATION</OPCODE>
<DATA>
- <TXID>18</TXID>
+ <TXID>20</TXID>
<PATH>/file_create</PATH>
<REPLICATION>1</REPLICATION>
</DATA>
@@ -226,7 +259,7 @@
<RECORD>
<OPCODE>OP_SET_PERMISSIONS</OPCODE>
<DATA>
- <TXID>19</TXID>
+ <TXID>21</TXID>
<SRC>/file_create</SRC>
<MODE>511</MODE>
</DATA>
@@ -234,7 +267,7 @@
<RECORD>
<OPCODE>OP_SET_OWNER</OPCODE>
<DATA>
- <TXID>20</TXID>
+ <TXID>22</TXID>
<SRC>/file_create</SRC>
<USERNAME>newOwner</USERNAME>
</DATA>
@@ -242,7 +275,7 @@
<RECORD>
<OPCODE>OP_TIMES</OPCODE>
<DATA>
- <TXID>21</TXID>
+ <TXID>23</TXID>
<LENGTH>0</LENGTH>
<PATH>/file_create</PATH>
<MTIME>1285195527000</MTIME>
@@ -252,7 +285,7 @@
<RECORD>
<OPCODE>OP_SET_QUOTA</OPCODE>
<DATA>
- <TXID>22</TXID>
+ <TXID>24</TXID>
<SRC>/directory_mkdir</SRC>
<NSQUOTA>1000</NSQUOTA>
<DSQUOTA>-1</DSQUOTA>
@@ -261,57 +294,57 @@
<RECORD>
<OPCODE>OP_RENAME</OPCODE>
<DATA>
- <TXID>23</TXID>
+ <TXID>25</TXID>
<LENGTH>0</LENGTH>
<SRC>/file_create</SRC>
<DST>/file_moved</DST>
- <TIMESTAMP>1421135800485</TIMESTAMP>
+ <TIMESTAMP>1421877810968</TIMESTAMP>
<OPTIONS>NONE</OPTIONS>
- <RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
- <RPC_CALLID>28</RPC_CALLID>
+ <RPC_CLIENTID>1730855b-1f27-4f17-9f72-b9f92eb3a8bd</RPC_CLIENTID>
+ <RPC_CALLID>27</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ADD</OPCODE>
<DATA>
- <TXID>24</TXID>
+ <TXID>26</TXID>
<LENGTH>0</LENGTH>
<INODEID>16389</INODEID>
<PATH>/file_concat_target</PATH>
<REPLICATION>1</REPLICATION>
- <MTIME>1421135800495</MTIME>
- <ATIME>1421135800495</ATIME>
+ <MTIME>1421877810972</MTIME>
+ <ATIME>1421877810972</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
- <CLIENT_NAME>DFSClient_NONMAPREDUCE_240777107_1</CLIENT_NAME>
+ <CLIENT_NAME>DFSClient_NONMAPREDUCE_-986598042_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<OVERWRITE>true</OVERWRITE>
<PERMISSION_STATUS>
- <USERNAME>shv</USERNAME>
+ <USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
- <RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
- <RPC_CALLID>30</RPC_CALLID>
+ <RPC_CLIENTID>1730855b-1f27-4f17-9f72-b9f92eb3a8bd</RPC_CLIENTID>
+ <RPC_CALLID>29</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ALLOCATE_BLOCK_ID</OPCODE>
<DATA>
- <TXID>25</TXID>
+ <TXID>27</TXID>
<BLOCK_ID>1073741825</BLOCK_ID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_SET_GENSTAMP_V2</OPCODE>
<DATA>
- <TXID>26</TXID>
+ <TXID>28</TXID>
<GENSTAMPV2>1001</GENSTAMPV2>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ADD_BLOCK</OPCODE>
<DATA>
- <TXID>27</TXID>
+ <TXID>29</TXID>
<PATH>/file_concat_target</PATH>
<BLOCK>
<BLOCK_ID>1073741825</BLOCK_ID>
@@ -325,21 +358,21 @@
<RECORD>
<OPCODE>OP_ALLOCATE_BLOCK_ID</OPCODE>
<DATA>
- <TXID>28</TXID>
+ <TXID>30</TXID>
<BLOCK_ID>1073741826</BLOCK_ID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_SET_GENSTAMP_V2</OPCODE>
<DATA>
- <TXID>29</TXID>
+ <TXID>31</TXID>
<GENSTAMPV2>1002</GENSTAMPV2>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ADD_BLOCK</OPCODE>
<DATA>
- <TXID>30</TXID>
+ <TXID>32</TXID>
<PATH>/file_concat_target</PATH>
<BLOCK>
<BLOCK_ID>1073741825</BLOCK_ID>
@@ -358,21 +391,21 @@
<RECORD>
<OPCODE>OP_ALLOCATE_BLOCK_ID</OPCODE>
<DATA>
- <TXID>31</TXID>
+ <TXID>33</TXID>
<BLOCK_ID>1073741827</BLOCK_ID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_SET_GENSTAMP_V2</OPCODE>
<DATA>
- <TXID>32</TXID>
+ <TXID>34</TXID>
<GENSTAMPV2>1003</GENSTAMPV2>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ADD_BLOCK</OPCODE>
<DATA>
- <TXID>33</TXID>
+ <TXID>35</TXID>
<PATH>/file_concat_target</PATH>
<BLOCK>
<BLOCK_ID>1073741826</BLOCK_ID>
@@ -391,13 +424,13 @@
<RECORD>
<OPCODE>OP_CLOSE</OPCODE>
<DATA>
- <TXID>34</TXID>
+ <TXID>36</TXID>
<LENGTH>0</LENGTH>
<INODEID>0</INODEID>
<PATH>/file_concat_target</PATH>
<REPLICATION>1</REPLICATION>
- <MTIME>1421135801050</MTIME>
- <ATIME>1421135800495</ATIME>
+ <MTIME>1421877811083</MTIME>
+ <ATIME>1421877810972</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
@@ -418,7 +451,7 @@
<GENSTAMP>1003</GENSTAMP>
</BLOCK>
<PERMISSION_STATUS>
- <USERNAME>shv</USERNAME>
+ <USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@@ -427,44 +460,44 @@
<RECORD>
<OPCODE>OP_ADD</OPCODE>
<DATA>
- <TXID>35</TXID>
+ <TXID>37</TXID>
<LENGTH>0</LENGTH>
<INODEID>16390</INODEID>
<PATH>/file_concat_0</PATH>
<REPLICATION>1</REPLICATION>
- <MTIME>1421135801053</MTIME>
- <ATIME>1421135801053</ATIME>
+ <MTIME>1421877811086</MTIME>
+ <ATIME>1421877811086</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
- <CLIENT_NAME>DFSClient_NONMAPREDUCE_240777107_1</CLIENT_NAME>
+ <CLIENT_NAME>DFSClient_NONMAPREDUCE_-986598042_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<OVERWRITE>true</OVERWRITE>
<PERMISSION_STATUS>
- <USERNAME>shv</USERNAME>
+ <USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
- <RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
- <RPC_CALLID>41</RPC_CALLID>
+ <RPC_CLIENTID>1730855b-1f27-4f17-9f72-b9f92eb3a8bd</RPC_CLIENTID>
+ <RPC_CALLID>39</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ALLOCATE_BLOCK_ID</OPCODE>
<DATA>
- <TXID>36</TXID>
+ <TXID>38</TXID>
<BLOCK_ID>1073741828</BLOCK_ID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_SET_GENSTAMP_V2</OPCODE>
<DATA>
- <TXID>37</TXID>
+ <TXID>39</TXID>
<GENSTAMPV2>1004</GENSTAMPV2>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ADD_BLOCK</OPCODE>
<DATA>
- <TXID>38</TXID>
+ <TXID>40</TXID>
<PATH>/file_concat_0</PATH>
<BLOCK>
<BLOCK_ID>1073741828</BLOCK_ID>
@@ -478,21 +511,21 @@
<RECORD>
<OPCODE>OP_ALLOCATE_BLOCK_ID</OPCODE>
<DATA>
- <TXID>39</TXID>
+ <TXID>41</TXID>
<BLOCK_ID>1073741829</BLOCK_ID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_SET_GENSTAMP_V2</OPCODE>
<DATA>
- <TXID>40</TXID>
+ <TXID>42</TXID>
<GENSTAMPV2>1005</GENSTAMPV2>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ADD_BLOCK</OPCODE>
<DATA>
- <TXID>41</TXID>
+ <TXID>43</TXID>
<PATH>/file_concat_0</PATH>
<BLOCK>
<BLOCK_ID>1073741828</BLOCK_ID>
@@ -511,21 +544,21 @@
<RECORD>
<OPCODE>OP_ALLOCATE_BLOCK_ID</OPCODE>
<DATA>
- <TXID>42</TXID>
+ <TXID>44</TXID>
<BLOCK_ID>1073741830</BLOCK_ID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_SET_GENSTAMP_V2</OPCODE>
<DATA>
- <TXID>43</TXID>
+ <TXID>45</TXID>
<GENSTAMPV2>1006</GENSTAMPV2>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ADD_BLOCK</OPCODE>
<DATA>
- <TXID>44</TXID>
+ <TXID>46</TXID>
<PATH>/file_concat_0</PATH>
<BLOCK>
<BLOCK_ID>1073741829</BLOCK_ID>
@@ -544,13 +577,13 @@
<RECORD>
<OPCODE>OP_CLOSE</OPCODE>
<DATA>
- <TXID>45</TXID>
+ <TXID>47</TXID>
<LENGTH>0</LENGTH>
<INODEID>0</INODEID>
<PATH>/file_concat_0</PATH>
<REPLICATION>1</REPLICATION>
- <MTIME>1421135801091</MTIME>
- <ATIME>1421135801053</ATIME>
+ <MTIME>1421877811108</MTIME>
+ <ATIME>1421877811086</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
@@ -571,7 +604,7 @@
<GENSTAMP>1006</GENSTAMP>
</BLOCK>
<PERMISSION_STATUS>
- <USERNAME>shv</USERNAME>
+ <USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@@ -580,44 +613,44 @@
<RECORD>
<OPCODE>OP_ADD</OPCODE>
<DATA>
- <TXID>46</TXID>
+ <TXID>48</TXID>
<LENGTH>0</LENGTH>
<INODEID>16391</INODEID>
<PATH>/file_concat_1</PATH>
<REPLICATION>1</REPLICATION>
- <MTIME>1421135801095</MTIME>
- <ATIME>1421135801095</ATIME>
+ <MTIME>1421877811110</MTIME>
+ <ATIME>1421877811110</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
- <CLIENT_NAME>DFSClient_NONMAPREDUCE_240777107_1</CLIENT_NAME>
+ <CLIENT_NAME>DFSClient_NONMAPREDUCE_-986598042_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<OVERWRITE>true</OVERWRITE>
<PERMISSION_STATUS>
- <USERNAME>shv</USERNAME>
+ <USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
- <RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
- <RPC_CALLID>50</RPC_CALLID>
+ <RPC_CLIENTID>1730855b-1f27-4f17-9f72-b9f92eb3a8bd</RPC_CLIENTID>
+ <RPC_CALLID>48</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ALLOCATE_BLOCK_ID</OPCODE>
<DATA>
- <TXID>47</TXID>
+ <TXID>49</TXID>
<BLOCK_ID>1073741831</BLOCK_ID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_SET_GENSTAMP_V2</OPCODE>
<DATA>
- <TXID>48</TXID>
+ <TXID>50</TXID>
<GENSTAMPV2>1007</GENSTAMPV2>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ADD_BLOCK</OPCODE>
<DATA>
- <TXID>49</TXID>
+ <TXID>51</TXID>
<PATH>/file_concat_1</PATH>
<BLOCK>
<BLOCK_ID>1073741831</BLOCK_ID>
@@ -631,21 +664,21 @@
<RECORD>
<OPCODE>OP_ALLOCATE_BLOCK_ID</OPCODE>
<DATA>
- <TXID>50</TXID>
+ <TXID>52</TXID>
<BLOCK_ID>1073741832</BLOCK_ID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_SET_GENSTAMP_V2</OPCODE>
<DATA>
- <TXID>51</TXID>
+ <TXID>53</TXID>
<GENSTAMPV2>1008</GENSTAMPV2>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ADD_BLOCK</OPCODE>
<DATA>
- <TXID>52</TXID>
+ <TXID>54</TXID>
<PATH>/file_concat_1</PATH>
<BLOCK>
<BLOCK_ID>1073741831</BLOCK_ID>
@@ -664,21 +697,21 @@
<RECORD>
<OPCODE>OP_ALLOCATE_BLOCK_ID</OPCODE>
<DATA>
- <TXID>53</TXID>
+ <TXID>55</TXID>
<BLOCK_ID>1073741833</BLOCK_ID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_SET_GENSTAMP_V2</OPCODE>
<DATA>
- <TXID>54</TXID>
+ <TXID>56</TXID>
<GENSTAMPV2>1009</GENSTAMPV2>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ADD_BLOCK</OPCODE>
<DATA>
- <TXID>55</TXID>
+ <TXID>57</TXID>
<PATH>/file_concat_1</PATH>
<BLOCK>
<BLOCK_ID>1073741832</BLOCK_ID>
@@ -697,13 +730,13 @@
<RECORD>
<OPCODE>OP_CLOSE</OPCODE>
<DATA>
- <TXID>56</TXID>
+ <TXID>58</TXID>
<LENGTH>0</LENGTH>
<INODEID>0</INODEID>
<PATH>/file_concat_1</PATH>
<REPLICATION>1</REPLICATION>
- <MTIME>1421135801126</MTIME>
- <ATIME>1421135801095</ATIME>
+ <MTIME>1421877811131</MTIME>
+ <ATIME>1421877811110</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
@@ -724,7 +757,7 @@
<GENSTAMP>1009</GENSTAMP>
</BLOCK>
<PERMISSION_STATUS>
- <USERNAME>shv</USERNAME>
+ <USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@@ -733,59 +766,59 @@
<RECORD>
<OPCODE>OP_CONCAT_DELETE</OPCODE>
<DATA>
- <TXID>57</TXID>
+ <TXID>59</TXID>
<LENGTH>0</LENGTH>
<TRG>/file_concat_target</TRG>
- <TIMESTAMP>1421135801130</TIMESTAMP>
+ <TIMESTAMP>1421877811134</TIMESTAMP>
<SOURCES>
<SOURCE1>/file_concat_0</SOURCE1>
<SOURCE2>/file_concat_1</SOURCE2>
</SOURCES>
- <RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
- <RPC_CALLID>58</RPC_CALLID>
+ <RPC_CLIENTID>1730855b-1f27-4f17-9f72-b9f92eb3a8bd</RPC_CLIENTID>
+ <RPC_CALLID>56</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ADD</OPCODE>
<DATA>
- <TXID>58</TXID>
+ <TXID>60</TXID>
<LENGTH>0</LENGTH>
<INODEID>16392</INODEID>
<PATH>/file_create</PATH>
<REPLICATION>1</REPLICATION>
- <MTIME>1421135810102</MTIME>
- <ATIME>1421135810102</ATIME>
+ <MTIME>1421877811137</MTIME>
+ <ATIME>1421877811137</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
- <CLIENT_NAME>DFSClient_NONMAPREDUCE_240777107_1</CLIENT_NAME>
+ <CLIENT_NAME>DFSClient_NONMAPREDUCE_-986598042_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<OVERWRITE>true</OVERWRITE>
<PERMISSION_STATUS>
- <USERNAME>shv</USERNAME>
+ <USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
- <RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
- <RPC_CALLID>63</RPC_CALLID>
+ <RPC_CLIENTID>1730855b-1f27-4f17-9f72-b9f92eb3a8bd</RPC_CLIENTID>
+ <RPC_CALLID>58</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ALLOCATE_BLOCK_ID</OPCODE>
<DATA>
- <TXID>59</TXID>
+ <TXID>61</TXID>
<BLOCK_ID>1073741834</BLOCK_ID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_SET_GENSTAMP_V2</OPCODE>
<DATA>
- <TXID>60</TXID>
+ <TXID>62</TXID>
<GENSTAMPV2>1010</GENSTAMPV2>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ADD_BLOCK</OPCODE>
<DATA>
- <TXID>61</TXID>
+ <TXID>63</TXID>
<PATH>/file_create</PATH>
<BLOCK>
<BLOCK_ID>1073741834</BLOCK_ID>
@@ -799,21 +832,21 @@
<RECORD>
<OPCODE>OP_ALLOCATE_BLOCK_ID</OPCODE>
<DATA>
- <TXID>62</TXID>
+ <TXID>64</TXID>
<BLOCK_ID>1073741835</BLOCK_ID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_SET_GENSTAMP_V2</OPCODE>
<DATA>
- <TXID>63</TXID>
+ <TXID>65</TXID>
<GENSTAMPV2>1011</GENSTAMPV2>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ADD_BLOCK</OPCODE>
<DATA>
- <TXID>64</TXID>
+ <TXID>66</TXID>
<PATH>/file_create</PATH>
<BLOCK>
<BLOCK_ID>1073741834</BLOCK_ID>
@@ -832,13 +865,13 @@
<RECORD>
<OPCODE>OP_CLOSE</OPCODE>
<DATA>
- <TXID>65</TXID>
+ <TXID>67</TXID>
<LENGTH>0</LENGTH>
<INODEID>0</INODEID>
<PATH>/file_create</PATH>
<REPLICATION>1</REPLICATION>
- <MTIME>1421135810122</MTIME>
- <ATIME>1421135810102</ATIME>
+ <MTIME>1421877811152</MTIME>
+ <ATIME>1421877811137</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
@@ -854,7 +887,7 @@
<GENSTAMP>1011</GENSTAMP>
</BLOCK>
<PERMISSION_STATUS>
- <USERNAME>shv</USERNAME>
+ <USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@@ -863,74 +896,74 @@
<RECORD>
<OPCODE>OP_TRUNCATE</OPCODE>
<DATA>
- <TXID>66</TXID>
+ <TXID>68</TXID>
<SRC>/file_create</SRC>
- <CLIENTNAME>DFSClient_NONMAPREDUCE_240777107_1</CLIENTNAME>
+ <CLIENTNAME>DFSClient_NONMAPREDUCE_-986598042_1</CLIENTNAME>
<CLIENTMACHINE>127.0.0.1</CLIENTMACHINE>
<NEWLENGTH>512</NEWLENGTH>
- <TIMESTAMP>1421135810125</TIMESTAMP>
+ <TIMESTAMP>1421877811154</TIMESTAMP>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_SYMLINK</OPCODE>
<DATA>
- <TXID>67</TXID>
+ <TXID>69</TXID>
<LENGTH>0</LENGTH>
<INODEID>16393</INODEID>
<PATH>/file_symlink</PATH>
<VALUE>/file_concat_target</VALUE>
- <MTIME>1421135810132</MTIME>
- <ATIME>1421135810132</ATIME>
+ <MTIME>1421877811160</MTIME>
+ <ATIME>1421877811160</ATIME>
<PERMISSION_STATUS>
- <USERNAME>shv</USERNAME>
+ <USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>511</MODE>
</PERMISSION_STATUS>
- <RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
- <RPC_CALLID>70</RPC_CALLID>
+ <RPC_CLIENTID>1730855b-1f27-4f17-9f72-b9f92eb3a8bd</RPC_CLIENTID>
+ <RPC_CALLID>65</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ADD</OPCODE>
<DATA>
- <TXID>68</TXID>
+ <TXID>70</TXID>
<LENGTH>0</LENGTH>
<INODEID>16394</INODEID>
<PATH>/hard-lease-recovery-test</PATH>
<REPLICATION>1</REPLICATION>
- <MTIME>1421135810135</MTIME>
- <ATIME>1421135810135</ATIME>
+ <MTIME>1421877811163</MTIME>
+ <ATIME>1421877811163</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
- <CLIENT_NAME>DFSClient_NONMAPREDUCE_240777107_1</CLIENT_NAME>
+ <CLIENT_NAME>DFSClient_NONMAPREDUCE_-986598042_1</CLIENT_NAME>
<CLIENT_MACHINE>127.0.0.1</CLIENT_MACHINE>
<OVERWRITE>true</OVERWRITE>
<PERMISSION_STATUS>
- <USERNAME>shv</USERNAME>
+ <USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
- <RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
- <RPC_CALLID>71</RPC_CALLID>
+ <RPC_CLIENTID>1730855b-1f27-4f17-9f72-b9f92eb3a8bd</RPC_CLIENTID>
+ <RPC_CALLID>66</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ALLOCATE_BLOCK_ID</OPCODE>
<DATA>
- <TXID>69</TXID>
+ <TXID>71</TXID>
<BLOCK_ID>1073741836</BLOCK_ID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_SET_GENSTAMP_V2</OPCODE>
<DATA>
- <TXID>70</TXID>
+ <TXID>72</TXID>
<GENSTAMPV2>1012</GENSTAMPV2>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ADD_BLOCK</OPCODE>
<DATA>
- <TXID>71</TXID>
+ <TXID>73</TXID>
<PATH>/hard-lease-recovery-test</PATH>
<BLOCK>
<BLOCK_ID>1073741836</BLOCK_ID>
@@ -944,7 +977,7 @@
<RECORD>
<OPCODE>OP_UPDATE_BLOCKS</OPCODE>
<DATA>
- <TXID>72</TXID>
+ <TXID>74</TXID>
<PATH>/hard-lease-recovery-test</PATH>
<BLOCK>
<BLOCK_ID>1073741836</BLOCK_ID>
@@ -958,15 +991,15 @@
<RECORD>
<OPCODE>OP_SET_GENSTAMP_V2</OPCODE>
<DATA>
- <TXID>73</TXID>
+ <TXID>75</TXID>
<GENSTAMPV2>1013</GENSTAMPV2>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_REASSIGN_LEASE</OPCODE>
<DATA>
- <TXID>74</TXID>
- <LEASEHOLDER>DFSClient_NONMAPREDUCE_240777107_1</LEASEHOLDER>
+ <TXID>76</TXID>
+ <LEASEHOLDER>DFSClient_NONMAPREDUCE_-986598042_1</LEASEHOLDER>
<PATH>/hard-lease-recovery-test</PATH>
<NEWHOLDER>HDFS_NameNode</NEWHOLDER>
</DATA>
@@ -974,13 +1007,13 @@
<RECORD>
<OPCODE>OP_CLOSE</OPCODE>
<DATA>
- <TXID>75</TXID>
+ <TXID>77</TXID>
<LENGTH>0</LENGTH>
<INODEID>0</INODEID>
<PATH>/hard-lease-recovery-test</PATH>
<REPLICATION>1</REPLICATION>
- <MTIME>1421135812235</MTIME>
- <ATIME>1421135810135</ATIME>
+ <MTIME>1421877813736</MTIME>
+ <ATIME>1421877811163</ATIME>
<BLOCKSIZE>512</BLOCKSIZE>
<CLIENT_NAME></CLIENT_NAME>
<CLIENT_MACHINE></CLIENT_MACHINE>
@@ -991,7 +1024,7 @@
<GENSTAMP>1013</GENSTAMP>
</BLOCK>
<PERMISSION_STATUS>
- <USERNAME>shv</USERNAME>
+ <USERNAME>jing</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
@@ -1000,72 +1033,72 @@
<RECORD>
<OPCODE>OP_ADD_CACHE_POOL</OPCODE>
<DATA>
- <TXID>76</TXID>
+ <TXID>78</TXID>
<POOLNAME>pool1</POOLNAME>
- <OWNERNAME>shv</OWNERNAME>
- <GROUPNAME>shv</GROUPNAME>
+ <OWNERNAME>jing</OWNERNAME>
+ <GROUPNAME>staff</GROUPNAME>
<MODE>493</MODE>
<LIMIT>9223372036854775807</LIMIT>
<MAXRELATIVEEXPIRY>2305843009213693951</MAXRELATIVEEXPIRY>
- <RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
- <RPC_CALLID>78</RPC_CALLID>
+ <RPC_CLIENTID>1730855b-1f27-4f17-9f72-b9f92eb3a8bd</RPC_CLIENTID>
+ <RPC_CALLID>73</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_MODIFY_CACHE_POOL</OPCODE>
<DATA>
- <TXID>77</TXID>
+ <TXID>79</TXID>
<POOLNAME>pool1</POOLNAME>
<LIMIT>99</LIMIT>
- <RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
- <RPC_CALLID>79</RPC_CALLID>
+ <RPC_CLIENTID>1730855b-1f27-4f17-9f72-b9f92eb3a8bd</RPC_CLIENTID>
+ <RPC_CALLID>74</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ADD_CACHE_DIRECTIVE</OPCODE>
<DATA>
- <TXID>78</TXID>
+ <TXID>80</TXID>
<ID>1</ID>
<PATH>/path</PATH>
<REPLICATION>1</REPLICATION>
<POOL>pool1</POOL>
- <EXPIRATION>2305844430349507141</EXPIRATION>
- <RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
- <RPC_CALLID>80</RPC_CALLID>
+ <EXPIRATION>2305844431091508160</EXPIRATION>
+ <RPC_CLIENTID>1730855b-1f27-4f17-9f72-b9f92eb3a8bd</RPC_CLIENTID>
+ <RPC_CALLID>75</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_MODIFY_CACHE_DIRECTIVE</OPCODE>
<DATA>
- <TXID>79</TXID>
+ <TXID>81</TXID>
<ID>1</ID>
<REPLICATION>2</REPLICATION>
- <RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
- <RPC_CALLID>81</RPC_CALLID>
+ <RPC_CLIENTID>1730855b-1f27-4f17-9f72-b9f92eb3a8bd</RPC_CLIENTID>
+ <RPC_CALLID>76</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_REMOVE_CACHE_DIRECTIVE</OPCODE>
<DATA>
- <TXID>80</TXID>
+ <TXID>82</TXID>
<ID>1</ID>
- <RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
- <RPC_CALLID>82</RPC_CALLID>
+ <RPC_CLIENTID>1730855b-1f27-4f17-9f72-b9f92eb3a8bd</RPC_CLIENTID>
+ <RPC_CALLID>77</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_REMOVE_CACHE_POOL</OPCODE>
<DATA>
- <TXID>81</TXID>
+ <TXID>83</TXID>
<POOLNAME>pool1</POOLNAME>
- <RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
- <RPC_CALLID>83</RPC_CALLID>
+ <RPC_CLIENTID>1730855b-1f27-4f17-9f72-b9f92eb3a8bd</RPC_CLIENTID>
+ <RPC_CALLID>78</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_SET_ACL</OPCODE>
<DATA>
- <TXID>82</TXID>
+ <TXID>84</TXID>
<SRC>/file_concat_target</SRC>
<ENTRY>
<SCOPE>ACCESS</SCOPE>
@@ -1098,62 +1131,62 @@
<RECORD>
<OPCODE>OP_SET_XATTR</OPCODE>
<DATA>
- <TXID>83</TXID>
+ <TXID>85</TXID>
<SRC>/file_concat_target</SRC>
<XATTR>
<NAMESPACE>USER</NAMESPACE>
<NAME>a1</NAME>
<VALUE>0x313233</VALUE>
</XATTR>
- <RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
- <RPC_CALLID>85</RPC_CALLID>
+ <RPC_CLIENTID>1730855b-1f27-4f17-9f72-b9f92eb3a8bd</RPC_CLIENTID>
+ <RPC_CALLID>80</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_SET_XATTR</OPCODE>
<DATA>
- <TXID>84</TXID>
+ <TXID>86</TXID>
<SRC>/file_concat_target</SRC>
<XATTR>
<NAMESPACE>USER</NAMESPACE>
<NAME>a2</NAME>
<VALUE>0x373839</VALUE>
</XATTR>
- <RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
- <RPC_CALLID>86</RPC_CALLID>
+ <RPC_CLIENTID>1730855b-1f27-4f17-9f72-b9f92eb3a8bd</RPC_CLIENTID>
+ <RPC_CALLID>81</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_REMOVE_XATTR</OPCODE>
<DATA>
- <TXID>85</TXID>
+ <TXID>87</TXID>
<SRC>/file_concat_target</SRC>
<XATTR>
<NAMESPACE>USER</NAMESPACE>
<NAME>a2</NAME>
</XATTR>
- <RPC_CLIENTID>cb20a92a-2c2f-4305-a838-2a01c6e73e18</RPC_CLIENTID>
- <RPC_CALLID>87</RPC_CALLID>
+ <RPC_CLIENTID>1730855b-1f27-4f17-9f72-b9f92eb3a8bd</RPC_CLIENTID>
+ <RPC_CALLID>82</RPC_CALLID>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ROLLING_UPGRADE_START</OPCODE>
<DATA>
- <TXID>86</TXID>
- <STARTTIME>1421135813268</STARTTIME>
+ <TXID>88</TXID>
+ <STARTTIME>1421877814254</STARTTIME>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_ROLLING_UPGRADE_FINALIZE</OPCODE>
<DATA>
- <TXID>87</TXID>
- <FINALIZETIME>1421135813268</FINALIZETIME>
+ <TXID>89</TXID>
+ <FINALIZETIME>1421877814254</FINALIZETIME>
</DATA>
</RECORD>
<RECORD>
<OPCODE>OP_END_LOG_SEGMENT</OPCODE>
<DATA>
- <TXID>88</TXID>
+ <TXID>90</TXID>
</DATA>
</RECORD>
</EDITS>
[3/3] hadoop git commit: HDFS-3689. Add support for variable length
block. Contributed by Jing Zhao.
Posted by ji...@apache.org.
HDFS-3689. Add support for variable length block. Contributed by Jing Zhao.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2848db81
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2848db81
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2848db81
Branch: refs/heads/trunk
Commit: 2848db814a98b83e7546f65a2751e56fb5b2dbe0
Parents: 1e2d98a
Author: Jing Zhao <ji...@apache.org>
Authored: Tue Jan 27 12:58:10 2015 -0800
Committer: Jing Zhao <ji...@apache.org>
Committed: Tue Jan 27 12:58:10 2015 -0800
----------------------------------------------------------------------
.../java/org/apache/hadoop/fs/CreateFlag.java | 24 +-
.../org/apache/hadoop/fs/FSOutputSummer.java | 2 +-
.../hadoop/hdfs/nfs/nfs3/WriteManager.java | 5 +-
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 +
.../java/org/apache/hadoop/hdfs/DFSClient.java | 48 +-
.../org/apache/hadoop/hdfs/DFSOutputStream.java | 69 +--
.../hadoop/hdfs/DistributedFileSystem.java | 10 +-
.../hdfs/client/HdfsDataOutputStream.java | 8 +-
.../org/apache/hadoop/hdfs/inotify/Event.java | 12 +
.../hadoop/hdfs/protocol/ClientProtocol.java | 9 +-
...tNamenodeProtocolServerSideTranslatorPB.java | 14 +-
.../ClientNamenodeProtocolTranslatorPB.java | 17 +-
.../apache/hadoop/hdfs/protocolPB/PBHelper.java | 25 +-
.../datanode/web/webhdfs/WebHdfsHandler.java | 3 +-
.../hdfs/server/namenode/FSDirConcatOp.java | 259 +++++------
.../hdfs/server/namenode/FSDirectory.java | 4 +-
.../hadoop/hdfs/server/namenode/FSEditLog.java | 20 +-
.../hdfs/server/namenode/FSEditLogLoader.java | 64 ++-
.../hdfs/server/namenode/FSEditLogOp.java | 101 ++++-
.../hdfs/server/namenode/FSEditLogOpCodes.java | 1 +
.../hdfs/server/namenode/FSNamesystem.java | 56 ++-
.../hadoop/hdfs/server/namenode/INodeFile.java | 2 +-
.../namenode/InotifyFSEditLogOpTranslator.java | 4 +
.../server/namenode/NameNodeLayoutVersion.java | 3 +-
.../hdfs/server/namenode/NameNodeRpcServer.java | 9 +-
.../src/main/proto/ClientNamenodeProtocol.proto | 2 +
.../hadoop-hdfs/src/main/proto/inotify.proto | 1 +
.../org/apache/hadoop/hdfs/AppendTestUtil.java | 16 +
.../org/apache/hadoop/hdfs/DFSTestUtil.java | 3 +
.../hdfs/TestDFSInotifyEventInputStream.java | 9 +-
.../org/apache/hadoop/hdfs/TestFileAppend.java | 162 +++++++
.../org/apache/hadoop/hdfs/TestFileAppend2.java | 193 +++++++-
.../org/apache/hadoop/hdfs/TestFileAppend3.java | 212 +++++++--
.../hadoop/hdfs/TestFileAppendRestart.java | 10 +-
.../java/org/apache/hadoop/hdfs/TestHFlush.java | 128 +++++-
.../apache/hadoop/hdfs/TestLeaseRecovery.java | 6 +-
.../fsdataset/impl/TestLazyPersistFiles.java | 5 +-
.../hdfs/server/namenode/TestHDFSConcat.java | 78 +++-
.../server/namenode/TestNamenodeRetryCache.java | 16 +-
.../namenode/ha/TestRetryCacheWithHA.java | 10 +-
.../hadoop-hdfs/src/test/resources/editsStored | Bin 5586 -> 5803 bytes
.../src/test/resources/editsStored.xml | 437 ++++++++++---------
42 files changed, 1509 insertions(+), 550 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java
index c5d23b4..e008ecc 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java
@@ -47,6 +47,10 @@ import org.apache.hadoop.classification.InterfaceStability;
* <li> SYNC_BLOCK - to force closed blocks to the disk device.
* In addition {@link Syncable#hsync()} should be called after each write,
* if true synchronous behavior is required.</li>
+ * <li> LAZY_PERSIST - Create the block on transient storage (RAM) if
+ * available.</li>
+ * <li> APPEND_NEWBLOCK - Append data to a new block instead of end of the last
+ * partial block.</li>
* </ol>
*
* Following combination is not valid and will result in
@@ -93,7 +97,13 @@ public enum CreateFlag {
* This flag must only be used for intermediate data whose loss can be
* tolerated by the application.
*/
- LAZY_PERSIST((short) 0x10);
+ LAZY_PERSIST((short) 0x10),
+
+ /**
+ * Append data to a new block instead of the end of the last partial block.
+ * This is only useful for APPEND.
+ */
+ NEW_BLOCK((short) 0x20);
private final short mode;
@@ -149,4 +159,16 @@ public enum CreateFlag {
+ ". Create option is not specified in " + flag);
}
}
+
+ /**
+ * Validate the CreateFlag for the append operation. The flag must contain
+ * APPEND, and cannot contain OVERWRITE.
+ */
+ public static void validateForAppend(EnumSet<CreateFlag> flag) {
+ validate(flag);
+ if (!flag.contains(APPEND)) {
+ throw new HadoopIllegalArgumentException(flag
+ + " does not contain APPEND");
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java
index 934421a..13a5e26 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSOutputSummer.java
@@ -165,7 +165,7 @@ abstract public class FSOutputSummer extends OutputStream {
count = partialLen;
System.arraycopy(buf, bufLen - count, buf, 0, count);
} else {
- count = 0;
+ count = 0;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
index df02e04..52c75ed 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
@@ -18,10 +18,12 @@
package org.apache.hadoop.hdfs.nfs.nfs3;
import java.io.IOException;
+import java.util.EnumSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
@@ -147,7 +149,8 @@ public class WriteManager {
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
- fos = dfsClient.append(fileIdPath, bufferSize, null, null);
+ fos = dfsClient.append(fileIdPath, bufferSize,
+ EnumSet.of(CreateFlag.APPEND), null, null);
latestAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
} catch (RemoteException e) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 1e1af97..b867a70 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -18,6 +18,8 @@ Trunk (Unreleased)
HDFS-3125. Add JournalService to enable Journal Daemon. (suresh)
+ HDFS-3689. Add support for variable length block. (jing9)
+
IMPROVEMENTS
HDFS-4665. Move TestNetworkTopologyWithNodeGroup to common.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 21f75a5..8512156 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -1656,9 +1656,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
* @param checksumOpt checksum options
*
* @return output stream
- *
- * @see ClientProtocol#create(String, FsPermission, String, EnumSetWritable,
- * boolean, short, long) for detailed description of exceptions thrown
+ *
+ * @see ClientProtocol#create for detailed description of exceptions thrown
*/
public DFSOutputStream create(String src,
FsPermission permission,
@@ -1732,7 +1731,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
}
return null;
}
- return callAppend(src, buffersize, progress);
+ return callAppend(src, buffersize, flag, progress);
}
return null;
}
@@ -1810,11 +1809,16 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
}
/** Method to get stream returned by append call */
- private DFSOutputStream callAppend(String src,
- int buffersize, Progressable progress) throws IOException {
- LastBlockWithStatus lastBlockWithStatus = null;
- try {
- lastBlockWithStatus = namenode.append(src, clientName);
+ private DFSOutputStream callAppend(String src, int buffersize,
+ EnumSet<CreateFlag> flag, Progressable progress) throws IOException {
+ CreateFlag.validateForAppend(flag);
+ try {
+ LastBlockWithStatus blkWithStatus = namenode.append(src, clientName,
+ new EnumSetWritable<>(flag, CreateFlag.class));
+ return DFSOutputStream.newStreamForAppend(this, src,
+ flag.contains(CreateFlag.NEW_BLOCK),
+ buffersize, progress, blkWithStatus.getLastBlock(),
+ blkWithStatus.getFileStatus(), dfsClientConf.createChecksum());
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
FileNotFoundException.class,
@@ -1824,10 +1828,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
UnresolvedPathException.class,
SnapshotAccessControlException.class);
}
- HdfsFileStatus newStat = lastBlockWithStatus.getFileStatus();
- return DFSOutputStream.newStreamForAppend(this, src, buffersize, progress,
- lastBlockWithStatus.getLastBlock(), newStat,
- dfsClientConf.createChecksum());
}
/**
@@ -1835,23 +1835,25 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
*
* @param src file name
* @param buffersize buffer size
+ * @param flag indicates whether to append data to a new block instead of
+ * the last block
* @param progress for reporting write-progress; null is acceptable.
* @param statistics file system statistics; null is acceptable.
* @return an output stream for writing into the file
*
- * @see ClientProtocol#append(String, String)
+ * @see ClientProtocol#append(String, String, EnumSetWritable)
*/
public HdfsDataOutputStream append(final String src, final int buffersize,
- final Progressable progress, final FileSystem.Statistics statistics
- ) throws IOException {
- final DFSOutputStream out = append(src, buffersize, progress);
+ EnumSet<CreateFlag> flag, final Progressable progress,
+ final FileSystem.Statistics statistics) throws IOException {
+ final DFSOutputStream out = append(src, buffersize, flag, progress);
return createWrappedOutputStream(out, statistics, out.getInitialLen());
}
- private DFSOutputStream append(String src, int buffersize, Progressable progress)
- throws IOException {
+ private DFSOutputStream append(String src, int buffersize,
+ EnumSet<CreateFlag> flag, Progressable progress) throws IOException {
checkOpen();
- final DFSOutputStream result = callAppend(src, buffersize, progress);
+ final DFSOutputStream result = callAppend(src, buffersize, flag, progress);
beginFileLease(result.getFileId(), result);
return result;
}
@@ -1938,7 +1940,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
/**
* Move blocks from src to trg and delete src
- * See {@link ClientProtocol#concat(String, String [])}.
+ * See {@link ClientProtocol#concat}.
*/
public void concat(String trg, String [] srcs) throws IOException {
checkOpen();
@@ -1980,7 +1982,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
/**
* Truncate a file to an indicated size
- * See {@link ClientProtocol#truncate(String, long)}.
+ * See {@link ClientProtocol#truncate}.
*/
public boolean truncate(String src, long newLength) throws IOException {
checkOpen();
@@ -3005,7 +3007,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
/**
* Get {@link ContentSummary} rooted at the specified directory.
- * @param path The string representation of the path
+ * @param src The string representation of the path
*
* @see ClientProtocol#getContentSummary(String)
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 67d3143..8cebda1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -426,15 +426,16 @@ public class DFSOutputStream extends FSOutputSummer
/**
* construction with tracing info
*/
- private DataStreamer(HdfsFileStatus stat, Span span) {
+ private DataStreamer(HdfsFileStatus stat, ExtendedBlock block, Span span) {
isAppend = false;
isLazyPersistFile = isLazyPersist(stat);
+ this.block = block;
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
traceSpan = span;
}
/**
- * Construct a data streamer for append
+ * Construct a data streamer for appending to the last partial block
* @param lastBlock last block of the file to be appended
* @param stat status of the file to be appended
* @param bytesPerChecksum number of bytes per checksum
@@ -1716,7 +1717,7 @@ public class DFSOutputStream extends FSOutputSummer
if (Trace.isTracing()) {
traceSpan = Trace.startSpan(this.getClass().getSimpleName()).detach();
}
- streamer = new DataStreamer(stat, traceSpan);
+ streamer = new DataStreamer(stat, null, traceSpan);
if (favoredNodes != null && favoredNodes.length != 0) {
streamer.setFavoredNodes(favoredNodes);
}
@@ -1773,7 +1774,7 @@ public class DFSOutputStream extends FSOutputSummer
}
/** Construct a new output stream for append. */
- private DFSOutputStream(DFSClient dfsClient, String src,
+ private DFSOutputStream(DFSClient dfsClient, String src, boolean toNewBlock,
Progressable progress, LocatedBlock lastBlock, HdfsFileStatus stat,
DataChecksum checksum) throws IOException {
this(dfsClient, src, progress, stat, checksum);
@@ -1785,21 +1786,24 @@ public class DFSOutputStream extends FSOutputSummer
}
// The last partial block of the file has to be filled.
- if (lastBlock != null) {
+ if (!toNewBlock && lastBlock != null) {
// indicate that we are appending to an existing block
bytesCurBlock = lastBlock.getBlockSize();
streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum, traceSpan);
} else {
- computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
- streamer = new DataStreamer(stat, traceSpan);
+ computePacketChunkSize(dfsClient.getConf().writePacketSize,
+ bytesPerChecksum);
+ streamer = new DataStreamer(stat,
+ lastBlock != null ? lastBlock.getBlock() : null, traceSpan);
}
this.fileEncryptionInfo = stat.getFileEncryptionInfo();
}
static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src,
- int buffersize, Progressable progress, LocatedBlock lastBlock,
- HdfsFileStatus stat, DataChecksum checksum) throws IOException {
- final DFSOutputStream out = new DFSOutputStream(dfsClient, src,
+ boolean toNewBlock, int bufferSize, Progressable progress,
+ LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum)
+ throws IOException {
+ final DFSOutputStream out = new DFSOutputStream(dfsClient, src, toNewBlock,
progress, lastBlock, stat, checksum);
out.start();
return out;
@@ -1995,35 +1999,37 @@ public class DFSOutputStream extends FSOutputSummer
long toWaitFor;
long lastBlockLength = -1L;
boolean updateLength = syncFlags.contains(SyncFlag.UPDATE_LENGTH);
+ boolean endBlock = syncFlags.contains(SyncFlag.END_BLOCK);
synchronized (this) {
- // flush checksum buffer, but keep checksum buffer intact
- int numKept = flushBuffer(true, true);
+ // flush checksum buffer, but keep checksum buffer intact if we do not
+ // need to end the current block
+ int numKept = flushBuffer(!endBlock, true);
// bytesCurBlock potentially incremented if there was buffered data
if (DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug(
- "DFSClient flush() :" +
- " bytesCurBlock " + bytesCurBlock +
- " lastFlushOffset " + lastFlushOffset);
+ DFSClient.LOG.debug("DFSClient flush():"
+ + " bytesCurBlock=" + bytesCurBlock
+ + " lastFlushOffset=" + lastFlushOffset
+ + " createNewBlock=" + endBlock);
}
// Flush only if we haven't already flushed till this offset.
if (lastFlushOffset != bytesCurBlock) {
assert bytesCurBlock > lastFlushOffset;
// record the valid offset of this flush
lastFlushOffset = bytesCurBlock;
- if (isSync && currentPacket == null) {
+ if (isSync && currentPacket == null && !endBlock) {
// Nothing to send right now,
// but sync was requested.
- // Send an empty packet
+ // Send an empty packet if we do not end the block right now
currentPacket = createPacket(packetSize, chunksPerPacket,
bytesCurBlock, currentSeqno++);
}
} else {
- if (isSync && bytesCurBlock > 0) {
+ if (isSync && bytesCurBlock > 0 && !endBlock) {
// Nothing to send right now,
// and the block was partially written,
// and sync was requested.
- // So send an empty sync packet.
+ // So send an empty sync packet if we do not end the block right now
currentPacket = createPacket(packetSize, chunksPerPacket,
bytesCurBlock, currentSeqno++);
} else if (currentPacket != null) {
@@ -2036,10 +2042,21 @@ public class DFSOutputStream extends FSOutputSummer
currentPacket.syncBlock = isSync;
waitAndQueueCurrentPacket();
}
- // Restore state of stream. Record the last flush offset
- // of the last full chunk that was flushed.
- //
- bytesCurBlock -= numKept;
+ if (endBlock && bytesCurBlock > 0) {
+ // Need to end the current block, thus send an empty packet to
+ // indicate this is the end of the block and reset bytesCurBlock
+ currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++);
+ currentPacket.lastPacketInBlock = true;
+ currentPacket.syncBlock = shouldSyncBlock || isSync;
+ waitAndQueueCurrentPacket();
+ bytesCurBlock = 0;
+ lastFlushOffset = 0;
+ } else {
+ // Restore state of stream. Record the last flush offset
+ // of the last full chunk that was flushed.
+ bytesCurBlock -= numKept;
+ }
+
toWaitFor = lastQueuedSeqno;
} // end synchronized
@@ -2058,8 +2075,8 @@ public class DFSOutputStream extends FSOutputSummer
// namenode.
if (persistBlocks.getAndSet(false) || updateLength) {
try {
- dfsClient.namenode.fsync(src, fileId,
- dfsClient.clientName, lastBlockLength);
+ dfsClient.namenode.fsync(src, fileId, dfsClient.clientName,
+ lastBlockLength);
} catch (IOException ioe) {
DFSClient.LOG.warn("Unable to persist blocks in hflush for " + src, ioe);
// If we got an error here, it might be because some other thread called
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
index 654e2f9..710ab18 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
@@ -314,13 +314,19 @@ public class DistributedFileSystem extends FileSystem {
@Override
public FSDataOutputStream append(Path f, final int bufferSize,
final Progressable progress) throws IOException {
+ return append(f, EnumSet.of(CreateFlag.APPEND), bufferSize, progress);
+ }
+
+ public FSDataOutputStream append(Path f, final EnumSet<CreateFlag> flag,
+ final int bufferSize, final Progressable progress) throws IOException {
statistics.incrementWriteOps(1);
Path absF = fixRelativePart(f);
return new FileSystemLinkResolver<FSDataOutputStream>() {
@Override
public FSDataOutputStream doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- return dfs.append(getPathName(p), bufferSize, progress, statistics);
+ throws IOException {
+ return dfs.append(getPathName(p), bufferSize, flag, progress,
+ statistics);
}
@Override
public FSDataOutputStream next(final FileSystem fs, final Path p)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataOutputStream.java
index 2149678..745ca7e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataOutputStream.java
@@ -101,6 +101,12 @@ public class HdfsDataOutputStream extends FSDataOutputStream {
* When doing sync to DataNodes, also update the metadata (block length) in
* the NameNode.
*/
- UPDATE_LENGTH;
+ UPDATE_LENGTH,
+
+ /**
+ * Sync the data to DataNode, close the current block, and allocate a new
+ * block
+ */
+ END_BLOCK;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/Event.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/Event.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/Event.java
index 5ceff1b..a6de289 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/Event.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/Event.java
@@ -463,15 +463,22 @@ public abstract class Event {
*/
public static class AppendEvent extends Event {
private String path;
+ private boolean newBlock;
public static class Builder {
private String path;
+ private boolean newBlock;
public Builder path(String path) {
this.path = path;
return this;
}
+ public Builder newBlock(boolean newBlock) {
+ this.newBlock = newBlock;
+ return this;
+ }
+
public AppendEvent build() {
return new AppendEvent(this);
}
@@ -480,11 +487,16 @@ public abstract class Event {
private AppendEvent(Builder b) {
super(EventType.APPEND);
this.path = b.path;
+ this.newBlock = b.newBlock;
}
public String getPath() {
return path;
}
+
+ public boolean toNewBlock() {
+ return newBlock;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
index cfd1c67..cba1982 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
@@ -203,6 +203,7 @@ public interface ClientProtocol {
* Append to the end of the file.
* @param src path of the file being created.
* @param clientName name of the current client.
+ * @param flag indicates whether the data is appended to a new block.
* @return wrapper with information about the last partial block and file
* status if any
* @throws AccessControlException if permission to append file is
@@ -225,10 +226,10 @@ public interface ClientProtocol {
* @throws UnsupportedOperationException if append is not supported
*/
@AtMostOnce
- public LastBlockWithStatus append(String src, String clientName)
- throws AccessControlException, DSQuotaExceededException,
- FileNotFoundException, SafeModeException, UnresolvedLinkException,
- SnapshotAccessControlException, IOException;
+ public LastBlockWithStatus append(String src, String clientName,
+ EnumSetWritable<CreateFlag> flag) throws AccessControlException,
+ DSQuotaExceededException, FileNotFoundException, SafeModeException,
+ UnresolvedLinkException, SnapshotAccessControlException, IOException;
/**
* Set replication for an existing file.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
index 8bcc1eb..dbb8b85 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
@@ -18,12 +18,14 @@
package org.apache.hadoop.hdfs.protocolPB;
import java.io.IOException;
+import java.util.EnumSet;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
@@ -65,6 +67,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowS
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowSnapshotResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ConcatRequestProto;
@@ -187,8 +191,6 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Update
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineResponseProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathResponseProto;
@@ -209,6 +211,7 @@ import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.SetXAttrResponseProto;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.namenode.INodeId;
+import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto;
import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenResponseProto;
@@ -412,8 +415,11 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
public AppendResponseProto append(RpcController controller,
AppendRequestProto req) throws ServiceException {
try {
+ EnumSetWritable<CreateFlag> flags = req.hasFlag() ?
+ PBHelper.convertCreateFlag(req.getFlag()) :
+ new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND));
LastBlockWithStatus result = server.append(req.getSrc(),
- req.getClientName());
+ req.getClientName(), flags);
AppendResponseProto.Builder builder = AppendResponseProto.newBuilder();
if (result.getLastBlock() != null) {
builder.setBlock(PBHelper.convert(result.getLastBlock()));
@@ -522,7 +528,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
throw new ServiceException(e);
}
}
-
+
@Override
public CompleteResponseProto complete(RpcController controller,
CompleteRequestProto req) throws ServiceException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index f3826af..1d6c0ee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -28,7 +28,6 @@ import com.google.common.collect.Lists;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.crypto.CipherSuite;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
import org.apache.hadoop.fs.CacheFlag;
@@ -85,6 +84,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AllowS
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AppendResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolEntryProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CompleteRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ConcatRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateRequestProto;
@@ -158,13 +158,11 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTim
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.TruncateRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CheckAccessRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.RemoveXAttrRequestProto;
@@ -318,13 +316,12 @@ public class ClientNamenodeProtocolTranslatorPB implements
}
@Override
- public LastBlockWithStatus append(String src, String clientName)
- throws AccessControlException, DSQuotaExceededException,
- FileNotFoundException, SafeModeException, UnresolvedLinkException,
- IOException {
- AppendRequestProto req = AppendRequestProto.newBuilder()
- .setSrc(src)
- .setClientName(clientName)
+ public LastBlockWithStatus append(String src, String clientName,
+ EnumSetWritable<CreateFlag> flag) throws AccessControlException,
+ DSQuotaExceededException, FileNotFoundException, SafeModeException,
+ UnresolvedLinkException, IOException {
+ AppendRequestProto req = AppendRequestProto.newBuilder().setSrc(src)
+ .setClientName(clientName).setFlag(PBHelper.convertCreateFlag(flag))
.build();
try {
AppendResponseProto res = rpcProxy.append(null, req);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index 7187838..e4746cc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -1373,6 +1373,9 @@ public class PBHelper {
if (flag.contains(CreateFlag.LAZY_PERSIST)) {
value |= CreateFlagProto.LAZY_PERSIST.getNumber();
}
+ if (flag.contains(CreateFlag.NEW_BLOCK)) {
+ value |= CreateFlagProto.NEW_BLOCK.getNumber();
+ }
return value;
}
@@ -1393,7 +1396,11 @@ public class PBHelper {
== CreateFlagProto.LAZY_PERSIST_VALUE) {
result.add(CreateFlag.LAZY_PERSIST);
}
- return new EnumSetWritable<CreateFlag>(result);
+ if ((flag & CreateFlagProto.NEW_BLOCK_VALUE)
+ == CreateFlagProto.NEW_BLOCK_VALUE) {
+ result.add(CreateFlag.NEW_BLOCK);
+ }
+ return new EnumSetWritable<CreateFlag>(result, CreateFlag.class);
}
public static int convertCacheFlags(EnumSet<CacheFlag> flags) {
@@ -2605,11 +2612,11 @@ public class PBHelper {
.build());
break;
case EVENT_APPEND:
- InotifyProtos.AppendEventProto reopen =
+ InotifyProtos.AppendEventProto append =
InotifyProtos.AppendEventProto.parseFrom(p.getContents());
- events.add(new Event.AppendEvent.Builder()
- .path(reopen.getPath())
- .build());
+ events.add(new Event.AppendEvent.Builder().path(append.getPath())
+ .newBlock(append.hasNewBlock() && append.getNewBlock())
+ .build());
break;
case EVENT_UNLINK:
InotifyProtos.UnlinkEventProto unlink =
@@ -2710,10 +2717,10 @@ public class PBHelper {
Event.AppendEvent re2 = (Event.AppendEvent) e;
events.add(InotifyProtos.EventProto.newBuilder()
.setType(InotifyProtos.EventType.EVENT_APPEND)
- .setContents(
- InotifyProtos.AppendEventProto.newBuilder()
- .setPath(re2.getPath()).build().toByteString()
- ).build());
+ .setContents(InotifyProtos.AppendEventProto.newBuilder()
+ .setPath(re2.getPath())
+ .setNewBlock(re2.toNewBlock()).build().toByteString())
+ .build());
break;
case UNLINK:
Event.UnlinkEvent ue = (Event.UnlinkEvent) e;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/WebHdfsHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/WebHdfsHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/WebHdfsHandler.java
index f02780a..be1faec 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/WebHdfsHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/webhdfs/WebHdfsHandler.java
@@ -176,7 +176,8 @@ public class WebHdfsHandler extends SimpleChannelInboundHandler<HttpRequest> {
final int bufferSize = params.bufferSize();
DFSClient dfsClient = newDfsClient(nnId, conf);
- OutputStream out = dfsClient.append(path, bufferSize, null, null);
+ OutputStream out = dfsClient.append(path, bufferSize,
+ EnumSet.of(CreateFlag.APPEND), null, null);
DefaultHttpResponse resp = new DefaultHttpResponse(HTTP_1_1, OK);
resp.headers().set(CONTENT_LENGTH, 0);
ctx.pipeline().replace(this, HdfsWriter.class.getSimpleName(),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
index 43d3b20..ecfd2e1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirConcatOp.java
@@ -19,11 +19,10 @@ package org.apache.hadoop.hdfs.server.namenode;
import com.google.common.base.Preconditions;
import org.apache.hadoop.HadoopIllegalArgumentException;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.SnapshotException;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import java.io.IOException;
import java.util.Arrays;
@@ -33,201 +32,171 @@ import java.util.Set;
import static org.apache.hadoop.util.Time.now;
class FSDirConcatOp {
- static HdfsFileStatus concat(
- FSDirectory fsd, String target, String[] srcs,
+
+ static HdfsFileStatus concat(FSDirectory fsd, String target, String[] srcs,
boolean logRetryCache) throws IOException {
Preconditions.checkArgument(!target.isEmpty(), "Target file name is empty");
Preconditions.checkArgument(srcs != null && srcs.length > 0,
"No sources given");
assert srcs != null;
-
- FSDirectory.LOG.debug("concat {} to {}", Arrays.toString(srcs), target);
- // We require all files be in the same directory
- String trgParent =
- target.substring(0, target.lastIndexOf(Path.SEPARATOR_CHAR));
- for (String s : srcs) {
- String srcParent = s.substring(0, s.lastIndexOf(Path.SEPARATOR_CHAR));
- if (!srcParent.equals(trgParent)) {
- throw new IllegalArgumentException(
- "Sources and target are not in the same directory");
- }
+ if (FSDirectory.LOG.isDebugEnabled()) {
+ FSDirectory.LOG.debug("concat {} to {}", Arrays.toString(srcs), target);
}
- final INodesInPath trgIip = fsd.getINodesInPath4Write(target);
+ final INodesInPath targetIIP = fsd.getINodesInPath4Write(target);
// write permission for the target
+ FSPermissionChecker pc = null;
if (fsd.isPermissionEnabled()) {
- FSPermissionChecker pc = fsd.getPermissionChecker();
- fsd.checkPathAccess(pc, trgIip, FsAction.WRITE);
-
- // and srcs
- for(String aSrc: srcs) {
- final INodesInPath srcIip = fsd.getINodesInPath4Write(aSrc);
- fsd.checkPathAccess(pc, srcIip, FsAction.READ); // read the file
- fsd.checkParentAccess(pc, srcIip, FsAction.WRITE); // for delete
- }
+ pc = fsd.getPermissionChecker();
+ fsd.checkPathAccess(pc, targetIIP, FsAction.WRITE);
}
- // to make sure no two files are the same
- Set<INode> si = new HashSet<INode>();
+ // check the target
+ verifyTargetFile(fsd, target, targetIIP);
+ // check the srcs
+ INodeFile[] srcFiles = verifySrcFiles(fsd, srcs, targetIIP, pc);
- // we put the following prerequisite for the operation
- // replication and blocks sizes should be the same for ALL the blocks
+ if(NameNode.stateChangeLog.isDebugEnabled()) {
+ NameNode.stateChangeLog.debug("DIR* NameSystem.concat: " +
+ Arrays.toString(srcs) + " to " + target);
+ }
+
+ long timestamp = now();
+ fsd.writeLock();
+ try {
+ unprotectedConcat(fsd, targetIIP, srcFiles, timestamp);
+ } finally {
+ fsd.writeUnlock();
+ }
+ fsd.getEditLog().logConcat(target, srcs, timestamp, logRetryCache);
+ return fsd.getAuditFileInfo(targetIIP);
+ }
+ private static void verifyTargetFile(FSDirectory fsd, final String target,
+ final INodesInPath targetIIP) throws IOException {
// check the target
- if (fsd.getEZForPath(trgIip) != null) {
+ if (fsd.getEZForPath(targetIIP) != null) {
throw new HadoopIllegalArgumentException(
"concat can not be called for files in an encryption zone.");
}
- final INodeFile trgInode = INodeFile.valueOf(trgIip.getLastINode(), target);
- if(trgInode.isUnderConstruction()) {
+ final INodeFile targetINode = INodeFile.valueOf(targetIIP.getLastINode(),
+ target);
+ if(targetINode.isUnderConstruction()) {
throw new HadoopIllegalArgumentException("concat: target file "
+ target + " is under construction");
}
- // per design target shouldn't be empty and all the blocks same size
- if(trgInode.numBlocks() == 0) {
- throw new HadoopIllegalArgumentException("concat: target file "
- + target + " is empty");
- }
- if (trgInode.isWithSnapshot()) {
- throw new HadoopIllegalArgumentException("concat: target file "
- + target + " is in a snapshot");
- }
-
- long blockSize = trgInode.getPreferredBlockSize();
-
- // check the end block to be full
- final BlockInfo last = trgInode.getLastBlock();
- if(blockSize != last.getNumBytes()) {
- throw new HadoopIllegalArgumentException("The last block in " + target
- + " is not full; last block size = " + last.getNumBytes()
- + " but file block size = " + blockSize);
- }
-
- si.add(trgInode);
- final short repl = trgInode.getFileReplication();
+ }
+ private static INodeFile[] verifySrcFiles(FSDirectory fsd, String[] srcs,
+ INodesInPath targetIIP, FSPermissionChecker pc) throws IOException {
+ // to make sure no two files are the same
+ Set<INodeFile> si = new HashSet<>();
+ final INodeFile targetINode = targetIIP.getLastINode().asFile();
+ final INodeDirectory targetParent = targetINode.getParent();
// now check the srcs
- boolean endSrc = false; // final src file doesn't have to have full end block
- for(int i=0; i< srcs.length; i++) {
- String src = srcs[i];
- if(i== srcs.length-1)
- endSrc=true;
-
- final INodeFile srcInode = INodeFile.valueOf(fsd.getINode4Write(src), src);
- if(src.isEmpty()
- || srcInode.isUnderConstruction()
- || srcInode.numBlocks() == 0) {
- throw new HadoopIllegalArgumentException("concat: source file " + src
- + " is invalid or empty or underConstruction");
+ for(String src : srcs) {
+ final INodesInPath iip = fsd.getINodesInPath4Write(src);
+ // permission check for srcs
+ if (pc != null) {
+ fsd.checkPathAccess(pc, iip, FsAction.READ); // read the file
+ fsd.checkParentAccess(pc, iip, FsAction.WRITE); // for delete
}
-
- // check replication and blocks size
- if(repl != srcInode.getBlockReplication()) {
- throw new HadoopIllegalArgumentException("concat: the source file "
- + src + " and the target file " + target
- + " should have the same replication: source replication is "
- + srcInode.getBlockReplication()
- + " but target replication is " + repl);
+ final INode srcINode = iip.getLastINode();
+ final INodeFile srcINodeFile = INodeFile.valueOf(srcINode, src);
+ // make sure the src file and the target file are in the same dir
+ if (srcINodeFile.getParent() != targetParent) {
+ throw new HadoopIllegalArgumentException("Source file " + src
+ + " is not in the same directory with the target "
+ + targetIIP.getPath());
}
-
- //boolean endBlock=false;
- // verify that all the blocks are of the same length as target
- // should be enough to check the end blocks
- final BlockInfo[] srcBlocks = srcInode.getBlocks();
- int idx = srcBlocks.length-1;
- if(endSrc)
- idx = srcBlocks.length-2; // end block of endSrc is OK not to be full
- if(idx >= 0 && srcBlocks[idx].getNumBytes() != blockSize) {
- throw new HadoopIllegalArgumentException("concat: the source file "
- + src + " and the target file " + target
- + " should have the same blocks sizes: target block size is "
- + blockSize + " but the size of source block " + idx + " is "
- + srcBlocks[idx].getNumBytes());
+ // make sure all the source files are not in snapshot
+ if (srcINode.isInLatestSnapshot(iip.getLatestSnapshotId())) {
+ throw new SnapshotException("Concat: the source file " + src
+ + " is in snapshot");
}
-
- si.add(srcInode);
+ // check if the file has other references.
+ if (srcINode.isReference() && ((INodeReference.WithCount)
+ srcINode.asReference().getReferredINode()).getReferenceCount() > 1) {
+ throw new SnapshotException("Concat: the source file " + src
+ + " is referred by some other reference in some snapshot.");
+ }
+ if (srcINode == targetINode) {
+ throw new HadoopIllegalArgumentException("concat: the src file " + src
+ + " is the same with the target file " + targetIIP.getPath());
+ }
+ if(srcINodeFile.isUnderConstruction() || srcINodeFile.numBlocks() == 0) {
+ throw new HadoopIllegalArgumentException("concat: source file " + src
+ + " is invalid or empty or underConstruction");
+ }
+ si.add(srcINodeFile);
}
// make sure no two files are the same
- if(si.size() < srcs.length+1) { // trg + srcs
+ if(si.size() < srcs.length) {
// it means at least two files are the same
throw new HadoopIllegalArgumentException(
"concat: at least two of the source files are the same");
}
+ return si.toArray(new INodeFile[si.size()]);
+ }
- if(NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("DIR* NameSystem.concat: " +
- Arrays.toString(srcs) + " to " + target);
+ private static long computeQuotaDelta(INodeFile target, INodeFile[] srcList) {
+ long delta = 0;
+ short targetRepl = target.getBlockReplication();
+ for (INodeFile src : srcList) {
+ if (targetRepl != src.getBlockReplication()) {
+ delta += src.computeFileSize() *
+ (targetRepl - src.getBlockReplication());
+ }
}
+ return delta;
+ }
- long timestamp = now();
- fsd.writeLock();
- try {
- unprotectedConcat(fsd, target, srcs, timestamp);
- } finally {
- fsd.writeUnlock();
+ private static void verifyQuota(FSDirectory fsd, INodesInPath targetIIP,
+ long delta) throws QuotaExceededException {
+ if (!fsd.getFSNamesystem().isImageLoaded() || fsd.shouldSkipQuotaChecks()) {
+ // Do not check quota if editlog is still being processed
+ return;
}
- fsd.getEditLog().logConcat(target, srcs, timestamp, logRetryCache);
- return fsd.getAuditFileInfo(trgIip);
+ FSDirectory.verifyQuota(targetIIP, targetIIP.length() - 1, 0, delta, null);
}
/**
* Concat all the blocks from srcs to trg and delete the srcs files
* @param fsd FSDirectory
- * @param target target file to move the blocks to
- * @param srcs list of file to move the blocks from
*/
- static void unprotectedConcat(
- FSDirectory fsd, String target, String[] srcs, long timestamp)
- throws IOException {
+ static void unprotectedConcat(FSDirectory fsd, INodesInPath targetIIP,
+ INodeFile[] srcList, long timestamp) throws IOException {
assert fsd.hasWriteLock();
if (NameNode.stateChangeLog.isDebugEnabled()) {
- NameNode.stateChangeLog.debug("DIR* FSNamesystem.concat to "+target);
+ NameNode.stateChangeLog.debug("DIR* FSNamesystem.concat to "
+ + targetIIP.getPath());
}
- // do the move
-
- final INodesInPath trgIIP = fsd.getINodesInPath4Write(target, true);
- final INodeFile trgInode = trgIIP.getLastINode().asFile();
- INodeDirectory trgParent = trgIIP.getINode(-2).asDirectory();
- final int trgLatestSnapshot = trgIIP.getLatestSnapshotId();
-
- final INodeFile [] allSrcInodes = new INodeFile[srcs.length];
- for(int i = 0; i < srcs.length; i++) {
- final INodesInPath iip = fsd.getINodesInPath4Write(srcs[i]);
- final int latest = iip.getLatestSnapshotId();
- final INode inode = iip.getLastINode();
-
- // check if the file in the latest snapshot
- if (inode.isInLatestSnapshot(latest)) {
- throw new SnapshotException("Concat: the source file " + srcs[i]
- + " is in snapshot " + latest);
- }
- // check if the file has other references.
- if (inode.isReference() && ((INodeReference.WithCount)
- inode.asReference().getReferredINode()).getReferenceCount() > 1) {
- throw new SnapshotException("Concat: the source file " + srcs[i]
- + " is referred by some other reference in some snapshot.");
- }
+ final INodeFile trgInode = targetIIP.getLastINode().asFile();
+ long delta = computeQuotaDelta(trgInode, srcList);
+ verifyQuota(fsd, targetIIP, delta);
- allSrcInodes[i] = inode.asFile();
- }
- trgInode.concatBlocks(allSrcInodes);
+ // the target file can be included in a snapshot
+ trgInode.recordModification(targetIIP.getLatestSnapshotId());
+ INodeDirectory trgParent = targetIIP.getINode(-2).asDirectory();
+ trgInode.concatBlocks(srcList);
// since we are in the same dir - we can use same parent to remove files
int count = 0;
- for(INodeFile nodeToRemove: allSrcInodes) {
- if(nodeToRemove == null) continue;
-
- nodeToRemove.setBlocks(null);
- trgParent.removeChild(nodeToRemove, trgLatestSnapshot);
- fsd.getINodeMap().remove(nodeToRemove);
- count++;
+ for (INodeFile nodeToRemove : srcList) {
+ if(nodeToRemove != null) {
+ nodeToRemove.setBlocks(null);
+ nodeToRemove.getParent().removeChild(nodeToRemove);
+ fsd.getINodeMap().remove(nodeToRemove);
+ count++;
+ }
}
- trgInode.setModificationTime(timestamp, trgLatestSnapshot);
- trgParent.updateModificationTime(timestamp, trgLatestSnapshot);
+ trgInode.setModificationTime(timestamp, targetIIP.getLatestSnapshotId());
+ trgParent.updateModificationTime(timestamp, targetIIP.getLatestSnapshotId());
// update quota on the parent directory ('count' files removed, 0 space)
- FSDirectory.unprotectedUpdateCount(trgIIP, trgIIP.length() - 1, -count, 0);
+ FSDirectory.unprotectedUpdateCount(targetIIP, targetIIP.length() - 1,
+ -count, delta);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index c171448..c012847 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -452,7 +452,7 @@ public class FSDirectory implements Closeable {
Preconditions.checkState(fileINode.isUnderConstruction());
// check quota limits and updated space consumed
- updateCount(inodesInPath, 0, fileINode.getBlockDiskspace(), true);
+ updateCount(inodesInPath, 0, fileINode.getPreferredBlockDiskspace(), true);
// associate new last block for the file
BlockInfoUnderConstruction blockInfo =
@@ -508,7 +508,7 @@ public class FSDirectory implements Closeable {
}
// update space consumed
- updateCount(iip, 0, -fileNode.getBlockDiskspace(), true);
+ updateCount(iip, 0, -fileNode.getPreferredBlockDiskspace(), true);
return true;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
index 144be37..3c7eae4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
@@ -34,10 +34,10 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
-import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
@@ -52,9 +52,11 @@ import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddBlockOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCacheDirectiveInfoOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCachePoolOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCloseOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllocateBlockIdOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllowSnapshotOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AppendOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CancelDelegationTokenOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CloseOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ConcatDeleteOp;
@@ -76,6 +78,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenewDelegationTokenOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetAclOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV1Op;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV2Op;
@@ -90,7 +93,6 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TruncateOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeOp;
import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@@ -702,7 +704,19 @@ public class FSEditLog implements LogsPurgeable {
op.setRpcCallId(Server.getCallId());
}
}
-
+
+ public void logAppendFile(String path, INodeFile file, boolean newBlock,
+ boolean toLogRpcIds) {
+ FileUnderConstructionFeature uc = file.getFileUnderConstructionFeature();
+ assert uc != null;
+ AppendOp op = AppendOp.getInstance(cache.get()).setPath(path)
+ .setClientName(uc.getClientName())
+ .setClientMachine(uc.getClientMachine())
+ .setNewBlock(newBlock);
+ logRpcIds(op, toLogRpcIds);
+ logEdit(op);
+ }
+
/**
* Add open lease record to edit log.
* Records the block locations of the last block.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index 322e18c..7cb6486 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -34,6 +34,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.XAttrSetFlag;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
@@ -41,7 +42,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
@@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCachePoolOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCloseOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllocateBlockIdOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllowSnapshotOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AppendOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.BlockListUpdatingOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CancelDelegationTokenOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ClearNSQuotaOp;
@@ -68,6 +69,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyCachePoolOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ReassignLeaseOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCacheDirectiveInfoOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCachePoolOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveXAttrOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
@@ -83,7 +85,6 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetQuotaOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetReplicationOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetStoragePolicyOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetXAttrOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveXAttrOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SymlinkOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
@@ -325,22 +326,22 @@ public class FSEditLogLoader {
LOG.trace("replaying edit log: " + op);
}
final boolean toAddRetryCache = fsNamesys.hasRetryCache() && op.hasRpcIds();
-
+
switch (op.opCode) {
case OP_ADD: {
AddCloseOp addCloseOp = (AddCloseOp)op;
final String path =
renameReservedPathsOnUpgrade(addCloseOp.path, logVersion);
- if (LOG.isDebugEnabled()) {
- LOG.debug(op.opCode + ": " + path +
+ if (FSNamesystem.LOG.isDebugEnabled()) {
+ FSNamesystem.LOG.debug(op.opCode + ": " + path +
" numblocks : " + addCloseOp.blocks.length +
" clientHolder " + addCloseOp.clientName +
" clientMachine " + addCloseOp.clientMachine);
}
- // There three cases here:
+ // There are 3 cases here:
// 1. OP_ADD to create a new file
// 2. OP_ADD to update file blocks
- // 3. OP_ADD to open file for append
+ // 3. OP_ADD to open file for append (old append)
// See if the file already exists (persistBlocks call)
INodesInPath iip = fsDir.getINodesInPath(path, true);
@@ -383,19 +384,17 @@ public class FSEditLogLoader {
fsNamesys.addCacheEntryWithPayload(addCloseOp.rpcClientId,
addCloseOp.rpcCallId, stat);
}
- } else { // This is OP_ADD on an existing file
+ } else { // This is OP_ADD on an existing file (old append)
if (!oldFile.isUnderConstruction()) {
// This is case 3: a call to append() on an already-closed file.
if (FSNamesystem.LOG.isDebugEnabled()) {
FSNamesystem.LOG.debug("Reopening an already-closed file " +
"for append");
}
- // Note we do not replace the INodeFile when converting it to
- // under-construction
- LocatedBlock lb = fsNamesys.prepareFileForWrite(path, iip,
- addCloseOp.clientName, addCloseOp.clientMachine, false, false);
-
- // add the op into retry cache is necessary
+ LocatedBlock lb = fsNamesys.prepareFileForAppend(path, iip,
+ addCloseOp.clientName, addCloseOp.clientMachine, false, false,
+ false);
+ // add the op into retry cache if necessary
if (toAddRetryCache) {
HdfsFileStatus stat = FSDirStatAndListingOp.createFileStatus(
fsNamesys.dir,
@@ -453,6 +452,34 @@ public class FSEditLogLoader {
}
break;
}
+ case OP_APPEND: {
+ AppendOp appendOp = (AppendOp) op;
+ final String path = renameReservedPathsOnUpgrade(appendOp.path,
+ logVersion);
+ if (FSNamesystem.LOG.isDebugEnabled()) {
+ FSNamesystem.LOG.debug(op.opCode + ": " + path +
+ " clientName " + appendOp.clientName +
+ " clientMachine " + appendOp.clientMachine +
+ " newBlock " + appendOp.newBlock);
+ }
+ INodesInPath iip = fsDir.getINodesInPath4Write(path);
+ INodeFile file = INodeFile.valueOf(iip.getLastINode(), path);
+ if (!file.isUnderConstruction()) {
+ LocatedBlock lb = fsNamesys.prepareFileForAppend(path, iip,
+ appendOp.clientName, appendOp.clientMachine, appendOp.newBlock,
+ false, false);
+ // add the op into retry cache if necessary
+ if (toAddRetryCache) {
+ HdfsFileStatus stat = FSDirStatAndListingOp.createFileStatus(
+ fsNamesys.dir, HdfsFileStatus.EMPTY_NAME, file,
+ BlockStoragePolicySuite.ID_UNSPECIFIED,
+ Snapshot.CURRENT_STATE_ID, false, iip);
+ fsNamesys.addCacheEntryWithPayload(appendOp.rpcClientId,
+ appendOp.rpcCallId, new LastBlockWithStatus(lb, stat));
+ }
+ }
+ break;
+ }
case OP_UPDATE_BLOCKS: {
UpdateBlocksOp updateOp = (UpdateBlocksOp)op;
final String path =
@@ -499,7 +526,14 @@ public class FSEditLogLoader {
srcs[i] =
renameReservedPathsOnUpgrade(concatDeleteOp.srcs[i], logVersion);
}
- FSDirConcatOp.unprotectedConcat(fsDir, trg, srcs, concatDeleteOp.timestamp);
+ INodesInPath targetIIP = fsDir.getINodesInPath4Write(trg);
+ INodeFile[] srcFiles = new INodeFile[srcs.length];
+ for (int i = 0; i < srcs.length; i++) {
+ INodesInPath srcIIP = fsDir.getINodesInPath4Write(srcs[i]);
+ srcFiles[i] = srcIIP.getLastINode().asFile();
+ }
+ FSDirConcatOp.unprotectedConcat(fsDir, targetIIP, srcFiles,
+ concatDeleteOp.timestamp);
if (toAddRetryCache) {
fsNamesys.addCacheEntry(concatDeleteOp.rpcClientId,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
index 9424156..1629d80 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_APPEND;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_BLOCK;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_CACHE_DIRECTIVE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_CACHE_POOL;
@@ -207,6 +208,7 @@ public abstract class FSEditLogOp {
inst.put(OP_SET_XATTR, new SetXAttrOp());
inst.put(OP_REMOVE_XATTR, new RemoveXAttrOp());
inst.put(OP_SET_STORAGE_POLICY, new SetStoragePolicyOp());
+ inst.put(OP_APPEND, new AppendOp());
}
public FSEditLogOp get(FSEditLogOpCodes opcode) {
@@ -428,7 +430,7 @@ public abstract class FSEditLogOp {
private AddCloseOp(FSEditLogOpCodes opCode) {
super(opCode);
storagePolicyId = BlockStoragePolicySuite.ID_UNSPECIFIED;
- assert(opCode == OP_ADD || opCode == OP_CLOSE);
+ assert(opCode == OP_ADD || opCode == OP_CLOSE || opCode == OP_APPEND);
}
@Override
@@ -770,7 +772,7 @@ public abstract class FSEditLogOp {
}
static AddOp getInstance(OpInstanceCache cache) {
- return (AddOp)cache.get(OP_ADD);
+ return (AddOp) cache.get(OP_ADD);
}
@Override
@@ -788,7 +790,7 @@ public abstract class FSEditLogOp {
}
/**
- * Although {@link ClientProtocol#appendFile} may also log a close op, we do
+ * Although {@link ClientProtocol#append} may also log a close op, we do
* not need to record the rpc ids here since a successful appendFile op will
* finally log an AddOp.
*/
@@ -814,6 +816,97 @@ public abstract class FSEditLogOp {
return builder.toString();
}
}
+
+ static class AppendOp extends FSEditLogOp {
+ String path;
+ String clientName;
+ String clientMachine;
+ boolean newBlock;
+
+ private AppendOp() {
+ super(OP_APPEND);
+ }
+
+ static AppendOp getInstance(OpInstanceCache cache) {
+ return (AppendOp) cache.get(OP_APPEND);
+ }
+
+ AppendOp setPath(String path) {
+ this.path = path;
+ return this;
+ }
+
+ AppendOp setClientName(String clientName) {
+ this.clientName = clientName;
+ return this;
+ }
+
+ AppendOp setClientMachine(String clientMachine) {
+ this.clientMachine = clientMachine;
+ return this;
+ }
+
+ AppendOp setNewBlock(boolean newBlock) {
+ this.newBlock = newBlock;
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("AppendOp ");
+ builder.append("[path=").append(path);
+ builder.append(", clientName=").append(clientName);
+ builder.append(", clientMachine=").append(clientMachine);
+ builder.append(", newBlock=").append(newBlock).append("]");
+ return builder.toString();
+ }
+
+ @Override
+ void resetSubFields() {
+ this.path = null;
+ this.clientName = null;
+ this.clientMachine = null;
+ this.newBlock = false;
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion) throws IOException {
+ this.path = FSImageSerialization.readString(in);
+ this.clientName = FSImageSerialization.readString(in);
+ this.clientMachine = FSImageSerialization.readString(in);
+ this.newBlock = FSImageSerialization.readBoolean(in);
+ readRpcIds(in, logVersion);
+ }
+
+ @Override
+ public void writeFields(DataOutputStream out) throws IOException {
+ FSImageSerialization.writeString(path, out);
+ FSImageSerialization.writeString(clientName, out);
+ FSImageSerialization.writeString(clientMachine, out);
+ FSImageSerialization.writeBoolean(newBlock, out);
+ writeRpcIds(rpcClientId, rpcCallId, out);
+ }
+
+ @Override
+ protected void toXml(ContentHandler contentHandler) throws SAXException {
+ XMLUtils.addSaxString(contentHandler, "PATH", path);
+ XMLUtils.addSaxString(contentHandler, "CLIENT_NAME", clientName);
+ XMLUtils.addSaxString(contentHandler, "CLIENT_MACHINE", clientMachine);
+ XMLUtils.addSaxString(contentHandler, "NEWBLOCK",
+ Boolean.toString(newBlock));
+ appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
+ }
+
+ @Override
+ void fromXml(Stanza st) throws InvalidXmlException {
+ this.path = st.getValue("PATH");
+ this.clientName = st.getValue("CLIENT_NAME");
+ this.clientMachine = st.getValue("CLIENT_MACHINE");
+ this.newBlock = Boolean.parseBoolean(st.getValue("NEWBLOCK"));
+ readRpcIdsFromXml(st);
+ }
+ }
static class AddBlockOp extends FSEditLogOp {
private String path;
@@ -1643,7 +1736,7 @@ public abstract class FSEditLogOp {
* {@link ClientProtocol#updateBlockForPipeline},
* {@link ClientProtocol#recoverLease}, {@link ClientProtocol#addBlock}) or
* already bound with other editlog op which records rpc ids (
- * {@link ClientProtocol#startFile}). Thus no need to record rpc ids here.
+ * {@link ClientProtocol#create}). Thus no need to record rpc ids here.
*/
static class SetGenstampV1Op extends FSEditLogOp {
long genStampV1;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
index 468e048..6cd1617 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
@@ -74,6 +74,7 @@ public enum FSEditLogOpCodes {
OP_REMOVE_XATTR ((byte) 44),
OP_SET_STORAGE_POLICY ((byte) 45),
OP_TRUNCATE ((byte) 46),
+ OP_APPEND ((byte) 47),
// Note that the current range of the valid OP code is 0~127
OP_INVALID ((byte) -1);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index fae1641..ebdec1b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -250,6 +250,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RetriableException;
@@ -2586,12 +2587,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* <p>
*
* For description of parameters and exceptions thrown see
- * {@link ClientProtocol#append(String, String)}
- *
+ * {@link ClientProtocol#append(String, String, EnumSetWritable)}
+ *
* @return the last block locations if the block is partial or null otherwise
*/
private LocatedBlock appendFileInternal(FSPermissionChecker pc,
- INodesInPath iip, String holder, String clientMachine,
+ INodesInPath iip, String holder, String clientMachine, boolean newBlock,
boolean logRetryCache) throws IOException {
assert hasWriteLock();
// Verify that the destination does not exist as a directory already.
@@ -2613,7 +2614,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
INodeFile myFile = INodeFile.valueOf(inode, src, true);
final BlockStoragePolicy lpPolicy =
blockManager.getStoragePolicy("LAZY_PERSIST");
-
if (lpPolicy != null &&
lpPolicy.getId() == myFile.getStoragePolicyID()) {
throw new UnsupportedOperationException(
@@ -2629,8 +2629,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
throw new IOException("append: lastBlock=" + lastBlock +
" of src=" + src + " is not sufficiently replicated yet.");
}
- return prepareFileForWrite(src, iip, holder, clientMachine, true,
- logRetryCache);
+ return prepareFileForAppend(src, iip, holder, clientMachine, newBlock,
+ true, logRetryCache);
} catch (IOException ie) {
NameNode.stateChangeLog.warn("DIR* NameSystem.append: " +ie.getMessage());
throw ie;
@@ -2644,6 +2644,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* @param src path to the file
* @param leaseHolder identifier of the lease holder on this file
* @param clientMachine identifier of the client machine
+ * @param newBlock if the data is appended to a new block
* @param writeToEditLog whether to persist this change to the edit log
* @param logRetryCache whether to record RPC ids in editlog for retry cache
* rebuilding
@@ -2651,26 +2652,34 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
* @throws UnresolvedLinkException
* @throws IOException
*/
- LocatedBlock prepareFileForWrite(String src, INodesInPath iip,
- String leaseHolder, String clientMachine, boolean writeToEditLog,
- boolean logRetryCache) throws IOException {
+ LocatedBlock prepareFileForAppend(String src, INodesInPath iip,
+ String leaseHolder, String clientMachine, boolean newBlock,
+ boolean writeToEditLog, boolean logRetryCache) throws IOException {
final INodeFile file = iip.getLastINode().asFile();
file.recordModification(iip.getLatestSnapshotId());
file.toUnderConstruction(leaseHolder, clientMachine);
leaseManager.addLease(
file.getFileUnderConstructionFeature().getClientName(), src);
-
- LocatedBlock ret =
- blockManager.convertLastBlockToUnderConstruction(file, 0);
- if (ret != null) {
- // update the quota: use the preferred block size for UC block
- final long diff = file.getPreferredBlockSize() - ret.getBlockSize();
- dir.updateSpaceConsumed(iip, 0, diff * file.getBlockReplication());
+
+ LocatedBlock ret = null;
+ if (!newBlock) {
+ ret = blockManager.convertLastBlockToUnderConstruction(file, 0);
+ if (ret != null) {
+ // update the quota: use the preferred block size for UC block
+ final long diff = file.getPreferredBlockSize() - ret.getBlockSize();
+ dir.updateSpaceConsumed(iip, 0, diff * file.getBlockReplication());
+ }
+ } else {
+ BlockInfo lastBlock = file.getLastBlock();
+ if (lastBlock != null) {
+ ExtendedBlock blk = new ExtendedBlock(this.getBlockPoolId(), lastBlock);
+ ret = new LocatedBlock(blk, new DatanodeInfo[0]);
+ }
}
if (writeToEditLog) {
- getEditLog().logOpenFile(src, file, false, logRetryCache);
+ getEditLog().logAppendFile(src, file, newBlock, logRetryCache);
}
return ret;
}
@@ -2805,11 +2814,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
/**
* Append to an existing file in the namespace.
*/
- LastBlockWithStatus appendFile(
- String src, String holder, String clientMachine, boolean logRetryCache)
+ LastBlockWithStatus appendFile(String src, String holder,
+ String clientMachine, EnumSet<CreateFlag> flag, boolean logRetryCache)
throws IOException {
try {
- return appendFileInt(src, holder, clientMachine, logRetryCache);
+ return appendFileInt(src, holder, clientMachine,
+ flag.contains(CreateFlag.NEW_BLOCK), logRetryCache);
} catch (AccessControlException e) {
logAuditEvent(false, "append", src);
throw e;
@@ -2817,7 +2827,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
private LastBlockWithStatus appendFileInt(final String srcArg, String holder,
- String clientMachine, boolean logRetryCache) throws IOException {
+ String clientMachine, boolean newBlock, boolean logRetryCache)
+ throws IOException {
String src = srcArg;
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: src=" + src
@@ -2836,7 +2847,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
checkNameNodeSafeMode("Cannot append to file" + src);
src = dir.resolvePath(pc, src, pathComponents);
final INodesInPath iip = dir.getINodesInPath4Write(src);
- lb = appendFileInternal(pc, iip, holder, clientMachine, logRetryCache);
+ lb = appendFileInternal(pc, iip, holder, clientMachine, newBlock,
+ logRetryCache);
stat = FSDirStatAndListingOp.getFileInfo(dir, src, false,
FSDirectory.isReservedRawName(srcArg), true);
} catch (StandbyException se) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
index e871bdc..cbcdac9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
@@ -412,7 +412,7 @@ public class INodeFile extends INodeWithAdditionalFields
}
/** @return the diskspace required for a full block. */
- final long getBlockDiskspace() {
+ final long getPreferredBlockDiskspace() {
return getPreferredBlockSize() * getBlockReplication();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java
index f265340..5345b46 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java
@@ -65,6 +65,10 @@ public class InotifyFSEditLogOpTranslator {
FSEditLogOp.CloseOp cOp = (FSEditLogOp.CloseOp) op;
return new EventBatch(op.txid, new Event[] {
new Event.CloseEvent(cOp.path, getSize(cOp), cOp.mtime) });
+ case OP_APPEND:
+ FSEditLogOp.AppendOp appendOp = (FSEditLogOp.AppendOp) op;
+ return new EventBatch(op.txid, new Event[] {new Event.AppendEvent
+ .Builder().path(appendOp.path).newBlock(appendOp.newBlock).build()});
case OP_SET_REPLICATION:
FSEditLogOp.SetReplicationOp setRepOp = (FSEditLogOp.SetReplicationOp) op;
return new EventBatch(op.txid,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java
index d742c6d..848fa33 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeLayoutVersion.java
@@ -70,7 +70,8 @@ public class NameNodeLayoutVersion {
"creating file with overwrite"),
XATTRS_NAMESPACE_EXT(-59, "Increase number of xattr namespaces"),
BLOCK_STORAGE_POLICY(-60, "Block Storage policy"),
- TRUNCATE(-61, "Truncate");
+ TRUNCATE(-61, "Truncate"),
+ APPEND_NEW_BLOCK(-62, "Support appending to new block");
private final FeatureInfo info;
[2/3] hadoop git commit: HDFS-3689. Add support for variable length
block. Contributed by Jing Zhao.
Posted by ji...@apache.org.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index a3ac455..38fc637 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -633,15 +633,16 @@ class NameNodeRpcServer implements NamenodeProtocols {
}
@Override // ClientProtocol
- public LastBlockWithStatus append(String src, String clientName)
- throws IOException {
+ public LastBlockWithStatus append(String src, String clientName,
+ EnumSetWritable<CreateFlag> flag) throws IOException {
checkNNStartup();
String clientMachine = getClientMachine();
if (stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.append: file "
+src+" for "+clientName+" at "+clientMachine);
}
- CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null);
+ CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache,
+ null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (LastBlockWithStatus) cacheEntry.getPayload();
}
@@ -649,7 +650,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
LastBlockWithStatus info = null;
boolean success = false;
try {
- info = namesystem.appendFile(src, clientName, clientMachine,
+ info = namesystem.appendFile(src, clientName, clientMachine, flag.get(),
cacheEntry != null);
success = true;
} finally {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
index 5c9f752..34564d3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
@@ -66,6 +66,7 @@ enum CreateFlagProto {
OVERWRITE = 0x02; // Truncate/overwrite a file. Same as POSIX O_TRUNC
APPEND = 0x04; // Append to a file
LAZY_PERSIST = 0x10; // File with reduced durability guarantees.
+ NEW_BLOCK = 0x20; // Write data to a new block when appending
}
message CreateRequestProto {
@@ -86,6 +87,7 @@ message CreateResponseProto {
message AppendRequestProto {
required string src = 1;
required string clientName = 2;
+ optional uint32 flag = 3; // bits set using CreateFlag
}
message AppendResponseProto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto
index e50f14b..5b78fe6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto
@@ -89,6 +89,7 @@ message CloseEventProto {
message AppendEventProto {
required string path = 1;
+ optional bool newBlock = 2 [default = false];
}
message RenameEventProto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java
index eab44be..68a85b6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java
@@ -136,6 +136,22 @@ public class AppendTestUtil {
}
}
+ public static void check(DistributedFileSystem fs, Path p, int position,
+ int length) throws IOException {
+ byte[] buf = new byte[length];
+ int i = 0;
+ try {
+ FSDataInputStream in = fs.open(p);
+ in.read(position, buf, 0, buf.length);
+ for(i = position; i < length + position; i++) {
+ assertEquals((byte) i, buf[i - position]);
+ }
+ in.close();
+ } catch(IOException ioe) {
+ throw new IOException("p=" + p + ", length=" + length + ", i=" + i, ioe);
+ }
+ }
+
/**
* create a buffer that contains the entire test file data.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 0eef46f..126827a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -1132,6 +1132,9 @@ public class DFSTestUtil {
FSDataOutputStream s = filesystem.create(pathFileCreate);
// OP_CLOSE 9
s.close();
+ // OP_APPEND 47
+ FSDataOutputStream s2 = filesystem.append(pathFileCreate, 4096, null);
+ s2.close();
// OP_SET_STORAGE_POLICY 45
filesystem.setStoragePolicy(pathFileCreate,
HdfsConstants.HOT_STORAGE_POLICY_NAME);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
index 75a4ad4..4f449d1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.XAttrSetFlag;
@@ -71,7 +72,7 @@ public class TestDFSInotifyEventInputStream {
*/
@Test
public void testOpcodeCount() {
- Assert.assertEquals(48, FSEditLogOpCodes.values().length);
+ Assert.assertEquals(49, FSEditLogOpCodes.values().length);
}
@@ -109,7 +110,8 @@ public class TestDFSInotifyEventInputStream {
os.write(new byte[BLOCK_SIZE]);
os.close(); // CloseOp -> CloseEvent
// AddOp -> AppendEvent
- os = client.append("/file2", BLOCK_SIZE, null, null);
+ os = client.append("/file2", BLOCK_SIZE, EnumSet.of(CreateFlag.APPEND),
+ null, null);
os.write(new byte[BLOCK_SIZE]);
os.close(); // CloseOp -> CloseEvent
Thread.sleep(10); // so that the atime will get updated on the next line
@@ -182,13 +184,14 @@ public class TestDFSInotifyEventInputStream {
Assert.assertTrue(ce2.getFileSize() > 0);
Assert.assertTrue(ce2.getTimestamp() > 0);
- // AddOp
+ // AppendOp
batch = waitForNextEvents(eis);
Assert.assertEquals(1, batch.getEvents().length);
txid = checkTxid(batch, txid);
Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.APPEND);
Event.AppendEvent append2 = (Event.AppendEvent)batch.getEvents()[0];
Assert.assertEquals("/file2", append2.getPath());
+ Assert.assertFalse(append2.toNewBlock());
// CloseOp
batch = waitForNextEvents(eis);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
index 34c701d..3cb72ff 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
@@ -25,10 +25,12 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.EnumSet;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.HardLink;
@@ -344,7 +346,46 @@ public class TestFileAppend{
cluster.shutdown();
}
}
+
+ /** Test two consecutive appends on a file with a full block. */
+ @Test
+ public void testAppend2Twice() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+ final DistributedFileSystem fs1 = cluster.getFileSystem();
+ final FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(conf);
+ try {
+ final Path p = new Path("/testAppendTwice/foo");
+ final int len = 1 << 16;
+ final byte[] fileContents = AppendTestUtil.initBuffer(len);
+
+ {
+ // create a new file with a full block.
+ FSDataOutputStream out = fs2.create(p, true, 4096, (short)1, len);
+ out.write(fileContents, 0, len);
+ out.close();
+ }
+ //1st append does not add any data so that the last block remains full
+ //and the last block in INodeFileUnderConstruction is a BlockInfo
+ //but not BlockInfoUnderConstruction.
+ ((DistributedFileSystem) fs2).append(p,
+ EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
+
+ // 2nd append should get AlreadyBeingCreatedException
+ fs1.append(p);
+ Assert.fail();
+ } catch(RemoteException re) {
+ AppendTestUtil.LOG.info("Got an exception:", re);
+ Assert.assertEquals(AlreadyBeingCreatedException.class.getName(),
+ re.getClassName());
+ } finally {
+ fs2.close();
+ fs1.close();
+ cluster.shutdown();
+ }
+ }
+
/** Tests appending after soft-limit expires. */
@Test
public void testAppendAfterSoftLimit()
@@ -386,6 +427,54 @@ public class TestFileAppend{
}
}
+ /** Tests appending after soft-limit expires. */
+ @Test
+ public void testAppend2AfterSoftLimit() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
+ //Set small soft-limit for lease
+ final long softLimit = 1L;
+ final long hardLimit = 9999999L;
+
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
+ .build();
+ cluster.setLeasePeriod(softLimit, hardLimit);
+ cluster.waitActive();
+
+ DistributedFileSystem fs = cluster.getFileSystem();
+ DistributedFileSystem fs2 = new DistributedFileSystem();
+ fs2.initialize(fs.getUri(), conf);
+
+ final Path testPath = new Path("/testAppendAfterSoftLimit");
+ final byte[] fileContents = AppendTestUtil.initBuffer(32);
+
+ // create a new file without closing
+ FSDataOutputStream out = fs.create(testPath);
+ out.write(fileContents);
+
+ //Wait for > soft-limit
+ Thread.sleep(250);
+
+ try {
+ FSDataOutputStream appendStream2 = fs2.append(testPath,
+ EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
+ appendStream2.write(fileContents);
+ appendStream2.close();
+ assertEquals(fileContents.length, fs.getFileStatus(testPath).getLen());
+ // make sure we now have 1 block since the first writer was revoked
+ LocatedBlocks blks = fs.getClient().getLocatedBlocks(testPath.toString(),
+ 0L);
+ assertEquals(1, blks.getLocatedBlocks().size());
+ for (LocatedBlock blk : blks.getLocatedBlocks()) {
+ assertEquals(fileContents.length, blk.getBlockSize());
+ }
+ } finally {
+ fs.close();
+ fs2.close();
+ cluster.shutdown();
+ }
+ }
+
/**
* Old replica of the block should not be accepted as valid for append/read
*/
@@ -439,4 +528,77 @@ public class TestFileAppend{
}
}
+ /**
+ * Old replica of the block should not be accepted as valid for append/read
+ */
+ @Test
+ public void testMultiAppend2() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ conf.set("dfs.client.block.write.replace-datanode-on-failure.enable",
+ "false");
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
+ .build();
+ DistributedFileSystem fs = null;
+ final String hello = "hello\n";
+ try {
+ fs = cluster.getFileSystem();
+ Path path = new Path("/test");
+ FSDataOutputStream out = fs.create(path);
+ out.writeBytes(hello);
+ out.close();
+
+ // stop one datanode
+ DataNodeProperties dnProp = cluster.stopDataNode(0);
+ String dnAddress = dnProp.datanode.getXferAddress().toString();
+ if (dnAddress.startsWith("/")) {
+ dnAddress = dnAddress.substring(1);
+ }
+
+ // append again to bump genstamps
+ for (int i = 0; i < 2; i++) {
+ out = fs.append(path,
+ EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
+ out.writeBytes(hello);
+ out.close();
+ }
+
+ // re-open and make the block state as underconstruction
+ out = fs.append(path, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK),
+ 4096, null);
+ cluster.restartDataNode(dnProp, true);
+ // wait till the block report comes
+ Thread.sleep(2000);
+ out.writeBytes(hello);
+ out.close();
+ // check the block locations
+ LocatedBlocks blocks = fs.getClient().getLocatedBlocks(path.toString(), 0L);
+ // since we append the file 3 time, we should be 4 blocks
+ assertEquals(4, blocks.getLocatedBlocks().size());
+ for (LocatedBlock block : blocks.getLocatedBlocks()) {
+ assertEquals(hello.length(), block.getBlockSize());
+ }
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < 4; i++) {
+ sb.append(hello);
+ }
+ final byte[] content = sb.toString().getBytes();
+ AppendTestUtil.checkFullFile(fs, path, content.length, content,
+ "Read /test");
+
+ // restart namenode to make sure the editlog can be properly applied
+ cluster.restartNameNode(true);
+ cluster.waitActive();
+ AppendTestUtil.checkFullFile(fs, path, content.length, content,
+ "Read /test");
+ blocks = fs.getClient().getLocatedBlocks(path.toString(), 0L);
+ // since we append the file 3 time, we should be 4 blocks
+ assertEquals(4, blocks.getLocatedBlocks().size());
+ for (LocatedBlock block : blocks.getLocatedBlocks()) {
+ assertEquals(hello.length(), block.getBlockSize());
+ }
+ } finally {
+ IOUtils.closeStream(fs);
+ cluster.shutdown();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend2.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend2.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend2.java
index eecd23b..99d04dc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend2.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend2.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -24,14 +25,18 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@@ -67,11 +72,7 @@ public class TestFileAppend2 {
final int numberOfFiles = 50;
final int numThreads = 10;
final int numAppendsPerThread = 20;
-/***
- int numberOfFiles = 1;
- int numThreads = 1;
- int numAppendsPerThread = 2000;
-****/
+
Workload[] workload = null;
final ArrayList<Path> testFiles = new ArrayList<Path>();
volatile static boolean globalStatus = true;
@@ -229,16 +230,170 @@ public class TestFileAppend2 {
}
}
+ /**
+ * Creates one file, writes a few bytes to it and then closed it.
+ * Reopens the same file for appending using append2 API, write all blocks and
+ * then close. Verify that all data exists in file.
+ */
+ @Test
+ public void testSimpleAppend2() throws Exception {
+ final Configuration conf = new HdfsConfiguration();
+ if (simulatedStorage) {
+ SimulatedFSDataset.setFactory(conf);
+ }
+ conf.setInt(DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY, 50);
+ fileContents = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE);
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+ DistributedFileSystem fs = cluster.getFileSystem();
+ try {
+ { // test appending to a file.
+ // create a new file.
+ Path file1 = new Path("/simpleAppend.dat");
+ FSDataOutputStream stm = AppendTestUtil.createFile(fs, file1, 1);
+ System.out.println("Created file simpleAppend.dat");
+
+ // write to file
+ int mid = 186; // io.bytes.per.checksum bytes
+ System.out.println("Writing " + mid + " bytes to file " + file1);
+ stm.write(fileContents, 0, mid);
+ stm.close();
+ System.out.println("Wrote and Closed first part of file.");
+
+ // write to file
+ int mid2 = 607; // io.bytes.per.checksum bytes
+ System.out.println("Writing " + mid + " bytes to file " + file1);
+ stm = fs.append(file1,
+ EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
+ stm.write(fileContents, mid, mid2-mid);
+ stm.close();
+ System.out.println("Wrote and Closed second part of file.");
+
+ // write the remainder of the file
+ stm = fs.append(file1,
+ EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
+ // ensure getPos is set to reflect existing size of the file
+ assertTrue(stm.getPos() > 0);
+ System.out.println("Writing " + (AppendTestUtil.FILE_SIZE - mid2) +
+ " bytes to file " + file1);
+ stm.write(fileContents, mid2, AppendTestUtil.FILE_SIZE - mid2);
+ System.out.println("Written second part of file");
+ stm.close();
+ System.out.println("Wrote and Closed second part of file.");
+
+ // verify that entire file is good
+ AppendTestUtil.checkFullFile(fs, file1, AppendTestUtil.FILE_SIZE,
+ fileContents, "Read 2");
+ // also make sure there three different blocks for the file
+ List<LocatedBlock> blocks = fs.getClient().getLocatedBlocks(
+ file1.toString(), 0L).getLocatedBlocks();
+ assertEquals(12, blocks.size()); // the block size is 1024
+ assertEquals(mid, blocks.get(0).getBlockSize());
+ assertEquals(mid2 - mid, blocks.get(1).getBlockSize());
+ for (int i = 2; i < 11; i++) {
+ assertEquals(AppendTestUtil.BLOCK_SIZE, blocks.get(i).getBlockSize());
+ }
+ assertEquals((AppendTestUtil.FILE_SIZE - mid2)
+ % AppendTestUtil.BLOCK_SIZE, blocks.get(11).getBlockSize());
+ }
+
+ { // test appending to an non-existing file.
+ FSDataOutputStream out = null;
+ try {
+ out = fs.append(new Path("/non-existing.dat"),
+ EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
+ fail("Expected to have FileNotFoundException");
+ } catch(java.io.FileNotFoundException fnfe) {
+ System.out.println("Good: got " + fnfe);
+ fnfe.printStackTrace(System.out);
+ } finally {
+ IOUtils.closeStream(out);
+ }
+ }
+
+ { // test append permission.
+ // set root to all writable
+ Path root = new Path("/");
+ fs.setPermission(root, new FsPermission((short)0777));
+ fs.close();
+
+ // login as a different user
+ final UserGroupInformation superuser =
+ UserGroupInformation.getCurrentUser();
+ String username = "testappenduser";
+ String group = "testappendgroup";
+ assertFalse(superuser.getShortUserName().equals(username));
+ assertFalse(Arrays.asList(superuser.getGroupNames()).contains(group));
+ UserGroupInformation appenduser = UserGroupInformation
+ .createUserForTesting(username, new String[] { group });
+
+ fs = (DistributedFileSystem) DFSTestUtil.getFileSystemAs(appenduser,
+ conf);
+
+ // create a file
+ Path dir = new Path(root, getClass().getSimpleName());
+ Path foo = new Path(dir, "foo.dat");
+ FSDataOutputStream out = null;
+ int offset = 0;
+ try {
+ out = fs.create(foo);
+ int len = 10 + AppendTestUtil.nextInt(100);
+ out.write(fileContents, offset, len);
+ offset += len;
+ } finally {
+ IOUtils.closeStream(out);
+ }
+
+ // change dir and foo to minimal permissions.
+ fs.setPermission(dir, new FsPermission((short)0100));
+ fs.setPermission(foo, new FsPermission((short)0200));
+
+ // try append, should success
+ out = null;
+ try {
+ out = fs.append(foo,
+ EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
+ int len = 10 + AppendTestUtil.nextInt(100);
+ out.write(fileContents, offset, len);
+ offset += len;
+ } finally {
+ IOUtils.closeStream(out);
+ }
+
+ // change dir and foo to all but no write on foo.
+ fs.setPermission(foo, new FsPermission((short)0577));
+ fs.setPermission(dir, new FsPermission((short)0777));
+
+ // try append, should fail
+ out = null;
+ try {
+ out = fs.append(foo,
+ EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
+ fail("Expected to have AccessControlException");
+ } catch(AccessControlException ace) {
+ System.out.println("Good: got " + ace);
+ ace.printStackTrace(System.out);
+ } finally {
+ IOUtils.closeStream(out);
+ }
+ }
+ } finally {
+ fs.close();
+ cluster.shutdown();
+ }
+ }
+
//
// an object that does a bunch of appends to files
//
class Workload extends Thread {
private final int id;
private final MiniDFSCluster cluster;
+ private final boolean appendToNewBlock;
- Workload(MiniDFSCluster cluster, int threadIndex) {
+ Workload(MiniDFSCluster cluster, int threadIndex, boolean append2) {
id = threadIndex;
this.cluster = cluster;
+ this.appendToNewBlock = append2;
}
// create a bunch of files. Write to them and then verify.
@@ -261,7 +416,7 @@ public class TestFileAppend2 {
long len = 0;
int sizeToAppend = 0;
try {
- FileSystem fs = cluster.getFileSystem();
+ DistributedFileSystem fs = cluster.getFileSystem();
// add a random number of bytes to file
len = fs.getFileStatus(testfile).getLen();
@@ -285,7 +440,9 @@ public class TestFileAppend2 {
" appending " + sizeToAppend + " bytes " +
" to file " + testfile +
" of size " + len);
- FSDataOutputStream stm = fs.append(testfile);
+ FSDataOutputStream stm = appendToNewBlock ? fs.append(testfile,
+ EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null)
+ : fs.append(testfile);
stm.write(fileContents, (int)len, sizeToAppend);
stm.close();
@@ -298,7 +455,7 @@ public class TestFileAppend2 {
" expected size " + (len + sizeToAppend) +
" waiting for namenode metadata update.");
Thread.sleep(5000);
- } catch (InterruptedException e) {;}
+ } catch (InterruptedException e) {}
}
assertTrue("File " + testfile + " size is " +
@@ -306,7 +463,7 @@ public class TestFileAppend2 {
" but expected " + (len + sizeToAppend),
fs.getFileStatus(testfile).getLen() == (len + sizeToAppend));
- AppendTestUtil.checkFullFile(fs, testfile, (int)(len + sizeToAppend),
+ AppendTestUtil.checkFullFile(fs, testfile, (int) (len + sizeToAppend),
fileContents, "Read 2");
} catch (Throwable e) {
globalStatus = false;
@@ -331,10 +488,8 @@ public class TestFileAppend2 {
/**
* Test that appends to files at random offsets.
- * @throws IOException an exception might be thrown
*/
- @Test
- public void testComplexAppend() throws IOException {
+ private void testComplexAppend(boolean appendToNewBlock) throws IOException {
fileContents = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE);
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000);
@@ -366,7 +521,7 @@ public class TestFileAppend2 {
// Create threads and make them run workload concurrently.
workload = new Workload[numThreads];
for (int i = 0; i < numThreads; i++) {
- workload[i] = new Workload(cluster, i);
+ workload[i] = new Workload(cluster, i, appendToNewBlock);
workload[i].start();
}
@@ -390,4 +545,14 @@ public class TestFileAppend2 {
//
assertTrue("testComplexAppend Worker encountered exceptions.", globalStatus);
}
+
+ @Test
+ public void testComplexAppend() throws IOException {
+ testComplexAppend(false);
+ }
+
+ @Test
+ public void testComplexAppend2() throws IOException {
+ testComplexAppend(true);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend3.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend3.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend3.java
index d5de0ff..9ebe115 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend3.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend3.java
@@ -24,7 +24,10 @@ import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.util.EnumSet;
+import java.util.List;
+import org.apache.hadoop.fs.CreateFlag;
import org.mockito.invocation.InvocationOnMock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@@ -36,8 +39,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSClient;
-import org.apache.hadoop.hdfs.DFSClientAdapter;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -52,6 +53,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.log4j.Level;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -121,6 +123,32 @@ public class TestFileAppend3 {
AppendTestUtil.check(fs, p, len1 + len2);
}
+ @Test
+ public void testTC1ForAppend2() throws Exception {
+ final Path p = new Path("/TC1/foo2");
+
+ //a. Create file and write one block of data. Close file.
+ final int len1 = (int) BLOCK_SIZE;
+ {
+ FSDataOutputStream out = fs.create(p, false, buffersize, REPLICATION,
+ BLOCK_SIZE);
+ AppendTestUtil.write(out, 0, len1);
+ out.close();
+ }
+
+ // Reopen file to append. Append half block of data. Close file.
+ final int len2 = (int) BLOCK_SIZE / 2;
+ {
+ FSDataOutputStream out = fs.append(p,
+ EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
+ AppendTestUtil.write(out, len1, len2);
+ out.close();
+ }
+
+ // b. Reopen file and read 1.5 blocks worth of data. Close file.
+ AppendTestUtil.check(fs, p, len1 + len2);
+ }
+
/**
* TC2: Append on non-block boundary.
* @throws IOException an exception might be thrown
@@ -152,6 +180,40 @@ public class TestFileAppend3 {
AppendTestUtil.check(fs, p, len1 + len2);
}
+ @Test
+ public void testTC2ForAppend2() throws Exception {
+ final Path p = new Path("/TC2/foo2");
+
+ //a. Create file with one and a half block of data. Close file.
+ final int len1 = (int) (BLOCK_SIZE + BLOCK_SIZE / 2);
+ {
+ FSDataOutputStream out = fs.create(p, false, buffersize, REPLICATION,
+ BLOCK_SIZE);
+ AppendTestUtil.write(out, 0, len1);
+ out.close();
+ }
+
+ AppendTestUtil.check(fs, p, len1);
+
+ // Reopen file to append quarter block of data. Close file.
+ final int len2 = (int) BLOCK_SIZE / 4;
+ {
+ FSDataOutputStream out = fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK),
+ 4096, null);
+ AppendTestUtil.write(out, len1, len2);
+ out.close();
+ }
+
+ // b. Reopen file and read 1.75 blocks of data. Close file.
+ AppendTestUtil.check(fs, p, len1 + len2);
+ List<LocatedBlock> blocks = fs.getClient().getLocatedBlocks(
+ p.toString(), 0L).getLocatedBlocks();
+ Assert.assertEquals(3, blocks.size());
+ Assert.assertEquals(BLOCK_SIZE, blocks.get(0).getBlockSize());
+ Assert.assertEquals(BLOCK_SIZE / 2, blocks.get(1).getBlockSize());
+ Assert.assertEquals(BLOCK_SIZE / 4, blocks.get(2).getBlockSize());
+ }
+
/**
* TC5: Only one simultaneous append.
* @throws IOException an exception might be thrown
@@ -179,18 +241,63 @@ public class TestFileAppend3 {
AppendTestUtil.LOG.info("GOOD: got an exception", ioe);
}
+ try {
+ ((DistributedFileSystem) AppendTestUtil
+ .createHdfsWithDifferentUsername(conf)).append(p,
+ EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
+ fail("This should fail.");
+ } catch(IOException ioe) {
+ AppendTestUtil.LOG.info("GOOD: got an exception", ioe);
+ }
+
//d. On Machine M1, close file.
out.close();
}
+ @Test
+ public void testTC5ForAppend2() throws Exception {
+ final Path p = new Path("/TC5/foo2");
+
+ // a. Create file on Machine M1. Write half block to it. Close file.
+ {
+ FSDataOutputStream out = fs.create(p, false, buffersize, REPLICATION,
+ BLOCK_SIZE);
+ AppendTestUtil.write(out, 0, (int)(BLOCK_SIZE/2));
+ out.close();
+ }
+
+ // b. Reopen file in "append" mode on Machine M1.
+ FSDataOutputStream out = fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK),
+ 4096, null);
+
+ // c. On Machine M2, reopen file in "append" mode. This should fail.
+ try {
+ ((DistributedFileSystem) AppendTestUtil
+ .createHdfsWithDifferentUsername(conf)).append(p,
+ EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
+ fail("This should fail.");
+ } catch(IOException ioe) {
+ AppendTestUtil.LOG.info("GOOD: got an exception", ioe);
+ }
+
+ try {
+ AppendTestUtil.createHdfsWithDifferentUsername(conf).append(p);
+ fail("This should fail.");
+ } catch(IOException ioe) {
+ AppendTestUtil.LOG.info("GOOD: got an exception", ioe);
+ }
+
+ // d. On Machine M1, close file.
+ out.close();
+ }
+
/**
* TC7: Corrupted replicas are present.
* @throws IOException an exception might be thrown
*/
- @Test
- public void testTC7() throws Exception {
+ private void testTC7(boolean appendToNewBlock) throws Exception {
final short repl = 2;
- final Path p = new Path("/TC7/foo");
+ final Path p = new Path("/TC7/foo" + (appendToNewBlock ? "0" : "1"));
System.out.println("p=" + p);
//a. Create file with replication factor of 2. Write half block of data. Close file.
@@ -224,7 +331,8 @@ public class TestFileAppend3 {
//c. Open file in "append mode". Append a new block worth of data. Close file.
final int len2 = (int)BLOCK_SIZE;
{
- FSDataOutputStream out = fs.append(p);
+ FSDataOutputStream out = appendToNewBlock ?
+ fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null) : fs.append(p);
AppendTestUtil.write(out, len1, len2);
out.close();
}
@@ -233,13 +341,21 @@ public class TestFileAppend3 {
AppendTestUtil.check(fs, p, len1 + len2);
}
+ @Test
+ public void testTC7() throws Exception {
+ testTC7(false);
+ }
+
+ @Test
+ public void testTC7ForAppend2() throws Exception {
+ testTC7(true);
+ }
+
/**
* TC11: Racing rename
- * @throws IOException an exception might be thrown
*/
- @Test
- public void testTC11() throws Exception {
- final Path p = new Path("/TC11/foo");
+ private void testTC11(boolean appendToNewBlock) throws Exception {
+ final Path p = new Path("/TC11/foo" + (appendToNewBlock ? "0" : "1"));
System.out.println("p=" + p);
//a. Create file and write one block of data. Close file.
@@ -251,7 +367,9 @@ public class TestFileAppend3 {
}
//b. Reopen file in "append" mode. Append half block of data.
- FSDataOutputStream out = fs.append(p);
+ FSDataOutputStream out = appendToNewBlock ?
+ fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null) :
+ fs.append(p);
final int len2 = (int)BLOCK_SIZE/2;
AppendTestUtil.write(out, len1, len2);
out.hflush();
@@ -283,13 +401,21 @@ public class TestFileAppend3 {
}
}
+ @Test
+ public void testTC11() throws Exception {
+ testTC11(false);
+ }
+
+ @Test
+ public void testTC11ForAppend2() throws Exception {
+ testTC11(true);
+ }
+
/**
* TC12: Append to partial CRC chunk
- * @throws IOException an exception might be thrown
*/
- @Test
- public void testTC12() throws Exception {
- final Path p = new Path("/TC12/foo");
+ private void testTC12(boolean appendToNewBlock) throws Exception {
+ final Path p = new Path("/TC12/foo" + (appendToNewBlock ? "0" : "1"));
System.out.println("p=" + p);
//a. Create file with a block size of 64KB
@@ -305,23 +431,43 @@ public class TestFileAppend3 {
//b. Reopen file in "append" mode. Append another 5877 bytes of data. Close file.
final int len2 = 5877;
{
- FSDataOutputStream out = fs.append(p);
+ FSDataOutputStream out = appendToNewBlock ?
+ fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null) :
+ fs.append(p);
AppendTestUtil.write(out, len1, len2);
out.close();
}
//c. Reopen file and read 25687+5877 bytes of data from file. Close file.
AppendTestUtil.check(fs, p, len1 + len2);
+ if (appendToNewBlock) {
+ LocatedBlocks blks = fs.dfs.getLocatedBlocks(p.toString(), 0);
+ Assert.assertEquals(2, blks.getLocatedBlocks().size());
+ Assert.assertEquals(len1, blks.getLocatedBlocks().get(0).getBlockSize());
+ Assert.assertEquals(len2, blks.getLocatedBlocks().get(1).getBlockSize());
+ AppendTestUtil.check(fs, p, 0, len1);
+ AppendTestUtil.check(fs, p, len1, len2);
+ }
}
-
- /** Append to a partial CRC chunk and
- * the first write does not fill up the partial CRC trunk
- * *
- * @throws IOException
- */
+
@Test
- public void testAppendToPartialChunk() throws IOException {
- final Path p = new Path("/partialChunk/foo");
+ public void testTC12() throws Exception {
+ testTC12(false);
+ }
+
+ @Test
+ public void testTC12ForAppend2() throws Exception {
+ testTC12(true);
+ }
+
+ /**
+ * Append to a partial CRC chunk and the first write does not fill up the
+ * partial CRC trunk
+ */
+ private void testAppendToPartialChunk(boolean appendToNewBlock)
+ throws IOException {
+ final Path p = new Path("/partialChunk/foo"
+ + (appendToNewBlock ? "0" : "1"));
final int fileLen = 513;
System.out.println("p=" + p);
@@ -336,7 +482,9 @@ public class TestFileAppend3 {
System.out.println("Wrote 1 byte and closed the file " + p);
// append to file
- stm = fs.append(p);
+ stm = appendToNewBlock ?
+ fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null) :
+ fs.append(p);
// Append to a partial CRC trunk
stm.write(fileContents, 1, 1);
stm.hflush();
@@ -345,7 +493,9 @@ public class TestFileAppend3 {
System.out.println("Append 1 byte and closed the file " + p);
// write the remainder of the file
- stm = fs.append(p);
+ stm = appendToNewBlock ?
+ fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null) :
+ fs.append(p);
// ensure getPos is set to reflect existing size of the file
assertEquals(2, stm.getPos());
@@ -444,4 +594,14 @@ public class TestFileAppend3 {
// if append was called with a stale file stat.
doSmallAppends(file, fs, 20);
}
+
+ @Test
+ public void testAppendToPartialChunk() throws IOException {
+ testAppendToPartialChunk(false);
+ }
+
+ @Test
+ public void testAppendToPartialChunkforAppend2() throws IOException {
+ testAppendToPartialChunk(true);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppendRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppendRestart.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppendRestart.java
index 0bca23d..a2b344c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppendRestart.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppendRestart.java
@@ -99,10 +99,11 @@ public class TestFileAppendRestart {
// OP_ADD to create file
// OP_ADD_BLOCK for first block
// OP_CLOSE to close file
- // OP_ADD to reopen file
+ // OP_APPEND to reopen file
// OP_ADD_BLOCK for second block
// OP_CLOSE to close file
- assertEquals(2, (int)counts.get(FSEditLogOpCodes.OP_ADD).held);
+ assertEquals(1, (int)counts.get(FSEditLogOpCodes.OP_ADD).held);
+ assertEquals(1, (int)counts.get(FSEditLogOpCodes.OP_APPEND).held);
assertEquals(2, (int)counts.get(FSEditLogOpCodes.OP_ADD_BLOCK).held);
assertEquals(2, (int)counts.get(FSEditLogOpCodes.OP_CLOSE).held);
@@ -112,13 +113,14 @@ public class TestFileAppendRestart {
// OP_ADD to create file
// OP_ADD_BLOCK for first block
// OP_CLOSE to close file
- // OP_ADD to re-establish the lease
+ // OP_APPEND to re-establish the lease
// OP_UPDATE_BLOCKS from the updatePipeline call (increments genstamp of last block)
// OP_ADD_BLOCK at the start of the second block
// OP_CLOSE to close file
// Total: 2 OP_ADDs, 1 OP_UPDATE_BLOCKS, 2 OP_ADD_BLOCKs, and 2 OP_CLOSEs
// in addition to the ones above
- assertEquals(2+2, (int)counts.get(FSEditLogOpCodes.OP_ADD).held);
+ assertEquals(2, (int)counts.get(FSEditLogOpCodes.OP_ADD).held);
+ assertEquals(2, (int)counts.get(FSEditLogOpCodes.OP_APPEND).held);
assertEquals(1, (int)counts.get(FSEditLogOpCodes.OP_UPDATE_BLOCKS).held);
assertEquals(2+2, (int)counts.get(FSEditLogOpCodes.OP_ADD_BLOCK).held);
assertEquals(2+2, (int)counts.get(FSEditLogOpCodes.OP_CLOSE).held);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java
index 9ada95f..6bcfa71 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java
@@ -31,7 +31,9 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.io.IOUtils;
import org.apache.log4j.Level;
import org.junit.Test;
@@ -121,7 +123,66 @@ public class TestHFlush {
cluster.shutdown();
}
}
-
+
+ /**
+ * Test hsync with END_BLOCK flag.
+ */
+ @Test
+ public void hSyncEndBlock_00() throws IOException {
+ final int preferredBlockSize = 1024;
+ Configuration conf = new HdfsConfiguration();
+ conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, preferredBlockSize);
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
+ .build();
+ DistributedFileSystem fileSystem = cluster.getFileSystem();
+ FSDataOutputStream stm = null;
+ try {
+ Path path = new Path("/" + fName);
+ stm = fileSystem.create(path, true, 4096, (short) 2,
+ AppendTestUtil.BLOCK_SIZE);
+ System.out.println("Created file " + path.toString());
+ ((DFSOutputStream) stm.getWrappedStream()).hsync(EnumSet
+ .of(SyncFlag.END_BLOCK));
+ long currentFileLength = fileSystem.getFileStatus(path).getLen();
+ assertEquals(0L, currentFileLength);
+ LocatedBlocks blocks = fileSystem.dfs.getLocatedBlocks(path.toString(), 0);
+ assertEquals(0, blocks.getLocatedBlocks().size());
+
+ // write a block and call hsync(end_block) at the block boundary
+ stm.write(new byte[preferredBlockSize]);
+ ((DFSOutputStream) stm.getWrappedStream()).hsync(EnumSet
+ .of(SyncFlag.END_BLOCK));
+ currentFileLength = fileSystem.getFileStatus(path).getLen();
+ assertEquals(preferredBlockSize, currentFileLength);
+ blocks = fileSystem.dfs.getLocatedBlocks(path.toString(), 0);
+ assertEquals(1, blocks.getLocatedBlocks().size());
+
+ // call hsync then call hsync(end_block) immediately
+ stm.write(new byte[preferredBlockSize / 2]);
+ stm.hsync();
+ ((DFSOutputStream) stm.getWrappedStream()).hsync(EnumSet
+ .of(SyncFlag.END_BLOCK));
+ currentFileLength = fileSystem.getFileStatus(path).getLen();
+ assertEquals(preferredBlockSize + preferredBlockSize / 2,
+ currentFileLength);
+ blocks = fileSystem.dfs.getLocatedBlocks(path.toString(), 0);
+ assertEquals(2, blocks.getLocatedBlocks().size());
+
+ stm.write(new byte[preferredBlockSize / 4]);
+ stm.hsync();
+ currentFileLength = fileSystem.getFileStatus(path).getLen();
+ assertEquals(preferredBlockSize + preferredBlockSize / 2
+ + preferredBlockSize / 4, currentFileLength);
+ blocks = fileSystem.dfs.getLocatedBlocks(path.toString(), 0);
+ assertEquals(3, blocks.getLocatedBlocks().size());
+ } finally {
+ IOUtils.cleanup(null, stm, fileSystem);
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
/**
* The test calls
* {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
@@ -136,6 +197,29 @@ public class TestHFlush {
/**
* The test calls
* {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
+ * while requiring the semantic of {@link SyncFlag#END_BLOCK}.
+ */
+ @Test
+ public void hSyncEndBlock_01() throws IOException {
+ doTheJob(new HdfsConfiguration(), fName, AppendTestUtil.BLOCK_SIZE,
+ (short) 2, true, EnumSet.of(SyncFlag.END_BLOCK));
+ }
+
+ /**
+ * The test calls
+ * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
+ * while requiring the semantic of {@link SyncFlag#END_BLOCK} and
+ * {@link SyncFlag#UPDATE_LENGTH}.
+ */
+ @Test
+ public void hSyncEndBlockAndUpdateLength() throws IOException {
+ doTheJob(new HdfsConfiguration(), fName, AppendTestUtil.BLOCK_SIZE,
+ (short) 2, true, EnumSet.of(SyncFlag.END_BLOCK, SyncFlag.UPDATE_LENGTH));
+ }
+
+ /**
+ * The test calls
+ * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
* while requiring the semantic of {@link SyncFlag#UPDATE_LENGTH}.
* Similar with {@link #hFlush_02()} , it writes a file with a custom block
* size so the writes will be happening across block' boundaries
@@ -152,7 +236,20 @@ public class TestHFlush {
doTheJob(conf, fName, customBlockSize, (short) 2, true,
EnumSet.of(SyncFlag.UPDATE_LENGTH));
}
-
+
+ @Test
+ public void hSyncEndBlock_02() throws IOException {
+ Configuration conf = new HdfsConfiguration();
+ int customPerChecksumSize = 512;
+ int customBlockSize = customPerChecksumSize * 3;
+ // Modify defaul filesystem settings
+ conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
+
+ doTheJob(conf, fName, customBlockSize, (short) 2, true,
+ EnumSet.of(SyncFlag.END_BLOCK));
+ }
+
/**
* The test calls
* {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
@@ -173,7 +270,20 @@ public class TestHFlush {
doTheJob(conf, fName, customBlockSize, (short) 2, true,
EnumSet.of(SyncFlag.UPDATE_LENGTH));
}
-
+
+ @Test
+ public void hSyncEndBlock_03() throws IOException {
+ Configuration conf = new HdfsConfiguration();
+ int customPerChecksumSize = 400;
+ int customBlockSize = customPerChecksumSize * 3;
+ // Modify defaul filesystem settings
+ conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
+
+ doTheJob(conf, fName, customBlockSize, (short) 2, true,
+ EnumSet.of(SyncFlag.END_BLOCK));
+ }
+
/**
* The method starts new cluster with defined Configuration; creates a file
* with specified block_size and writes 10 equal sections in it; it also calls
@@ -197,12 +307,13 @@ public class TestHFlush {
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(replicas).build();
// Make sure we work with DFS in order to utilize all its functionality
- DistributedFileSystem fileSystem =
- cluster.getFileSystem();
+ DistributedFileSystem fileSystem = cluster.getFileSystem();
FSDataInputStream is;
try {
Path path = new Path(fileName);
+ final String pathName = new Path(fileSystem.getWorkingDirectory(), path)
+ .toUri().getPath();
FSDataOutputStream stm = fileSystem.create(path, false, 4096, replicas,
block_size);
System.out.println("Created file " + fileName);
@@ -210,7 +321,8 @@ public class TestHFlush {
int tenth = AppendTestUtil.FILE_SIZE/SECTIONS;
int rounding = AppendTestUtil.FILE_SIZE - tenth * SECTIONS;
for (int i=0; i<SECTIONS; i++) {
- System.out.println("Writing " + (tenth * i) + " to " + (tenth * (i+1)) + " section to file " + fileName);
+ System.out.println("Writing " + (tenth * i) + " to "
+ + (tenth * (i + 1)) + " section to file " + fileName);
// write to the file
stm.write(fileContent, tenth * i, tenth);
@@ -227,7 +339,11 @@ public class TestHFlush {
assertEquals(
"File size doesn't match for hsync/hflush with updating the length",
tenth * (i + 1), currentFileLength);
+ } else if (isSync && syncFlags.contains(SyncFlag.END_BLOCK)) {
+ LocatedBlocks blocks = fileSystem.dfs.getLocatedBlocks(pathName, 0);
+ assertEquals(i + 1, blocks.getLocatedBlocks().size());
}
+
byte [] toRead = new byte[tenth];
byte [] expected = new byte[tenth];
System.arraycopy(fileContent, tenth * i, expected, 0, tenth);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java
index b84989f..15580a5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java
@@ -22,8 +22,10 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
+import java.util.EnumSet;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -40,6 +42,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TestInterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After;
import org.junit.Test;
@@ -124,7 +127,8 @@ public class TestLeaseRecovery {
}
DataNode.LOG.info("dfs.dfs.clientName=" + dfs.dfs.clientName);
- cluster.getNameNodeRpc().append(filestr, dfs.dfs.clientName);
+ cluster.getNameNodeRpc().append(filestr, dfs.dfs.clientName,
+ new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND)));
// expire lease to trigger block recovery.
waitLeaseRecovery(cluster);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
index 84ac2a5..a4df4ab 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -28,6 +29,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
+import java.util.EnumSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -234,7 +236,8 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
makeTestFile(path, BLOCK_SIZE, true);
try {
- client.append(path.toString(), BUFFER_LENGTH, null, null).close();
+ client.append(path.toString(), BUFFER_LENGTH,
+ EnumSet.of(CreateFlag.APPEND), null, null).close();
fail("Append to LazyPersist file did not fail as expected");
} catch (Throwable t) {
LOG.info("Got expected exception ", t);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java
index 6d1f452..ddf5a3e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java
@@ -40,9 +40,12 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -99,7 +102,7 @@ public class TestHDFSConcat {
HdfsFileStatus fStatus;
FSDataInputStream stm;
- String trg = new String("/trg");
+ String trg = "/trg";
Path trgPath = new Path(trg);
DFSTestUtil.createFile(dfs, trgPath, fileLen, REPL_FACTOR, 1);
fStatus = nn.getFileInfo(trg);
@@ -112,7 +115,7 @@ public class TestHDFSConcat {
long [] lens = new long [numFiles];
- int i = 0;
+ int i;
for(i=0; i<files.length; i++) {
files[i] = new Path("/file"+i);
Path path = files[i];
@@ -385,6 +388,75 @@ public class TestHDFSConcat {
} catch (Exception e) {
// exspected
}
-
+ }
+
+ /**
+ * make sure we update the quota correctly after concat
+ */
+ @Test
+ public void testConcatWithQuotaDecrease() throws IOException {
+ final short srcRepl = 3; // note this is different with REPL_FACTOR
+ final int srcNum = 10;
+ final Path foo = new Path("/foo");
+ final Path[] srcs = new Path[srcNum];
+ final Path target = new Path(foo, "target");
+ DFSTestUtil.createFile(dfs, target, blockSize, REPL_FACTOR, 0L);
+
+ dfs.setQuota(foo, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1);
+
+ for (int i = 0; i < srcNum; i++) {
+ srcs[i] = new Path(foo, "src" + i);
+ DFSTestUtil.createFile(dfs, srcs[i], blockSize * 2, srcRepl, 0L);
+ }
+
+ ContentSummary summary = dfs.getContentSummary(foo);
+ Assert.assertEquals(11, summary.getFileCount());
+ Assert.assertEquals(blockSize * REPL_FACTOR +
+ blockSize * 2 * srcRepl * srcNum, summary.getSpaceConsumed());
+
+ dfs.concat(target, srcs);
+ summary = dfs.getContentSummary(foo);
+ Assert.assertEquals(1, summary.getFileCount());
+ Assert.assertEquals(
+ blockSize * REPL_FACTOR + blockSize * 2 * REPL_FACTOR * srcNum,
+ summary.getSpaceConsumed());
+ }
+
+ @Test
+ public void testConcatWithQuotaIncrease() throws IOException {
+ final short repl = 3;
+ final int srcNum = 10;
+ final Path foo = new Path("/foo");
+ final Path bar = new Path(foo, "bar");
+ final Path[] srcs = new Path[srcNum];
+ final Path target = new Path(bar, "target");
+ DFSTestUtil.createFile(dfs, target, blockSize, repl, 0L);
+
+ final long dsQuota = blockSize * repl + blockSize * srcNum * REPL_FACTOR;
+ dfs.setQuota(foo, Long.MAX_VALUE - 1, dsQuota);
+
+ for (int i = 0; i < srcNum; i++) {
+ srcs[i] = new Path(bar, "src" + i);
+ DFSTestUtil.createFile(dfs, srcs[i], blockSize, REPL_FACTOR, 0L);
+ }
+
+ ContentSummary summary = dfs.getContentSummary(bar);
+ Assert.assertEquals(11, summary.getFileCount());
+ Assert.assertEquals(dsQuota, summary.getSpaceConsumed());
+
+ try {
+ dfs.concat(target, srcs);
+ fail("QuotaExceededException expected");
+ } catch (RemoteException e) {
+ Assert.assertTrue(
+ e.unwrapRemoteException() instanceof QuotaExceededException);
+ }
+
+ dfs.setQuota(foo, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1);
+ dfs.concat(target, srcs);
+ summary = dfs.getContentSummary(bar);
+ Assert.assertEquals(1, summary.getFileCount());
+ Assert.assertEquals(blockSize * repl * (srcNum + 1),
+ summary.getSpaceConsumed());
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java
index 3084f26..2e6b4a3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java
@@ -232,14 +232,18 @@ public class TestNamenodeRetryCache {
// Retried append requests succeed
newCall();
- LastBlockWithStatus b = nnRpc.append(src, "holder");
- Assert.assertEquals(b, nnRpc.append(src, "holder"));
- Assert.assertEquals(b, nnRpc.append(src, "holder"));
+ LastBlockWithStatus b = nnRpc.append(src, "holder",
+ new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND)));
+ Assert.assertEquals(b, nnRpc.append(src, "holder",
+ new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND))));
+ Assert.assertEquals(b, nnRpc.append(src, "holder",
+ new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND))));
// non-retried call fails
newCall();
try {
- nnRpc.append(src, "holder");
+ nnRpc.append(src, "holder",
+ new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND)));
Assert.fail("testAppend - expected exception is not thrown");
} catch (Exception e) {
// Expected
@@ -409,7 +413,7 @@ public class TestNamenodeRetryCache {
LightWeightCache<CacheEntry, CacheEntry> cacheSet =
(LightWeightCache<CacheEntry, CacheEntry>) namesystem.getRetryCache().getCacheSet();
- assertEquals(24, cacheSet.size());
+ assertEquals(25, cacheSet.size());
Map<CacheEntry, CacheEntry> oldEntries =
new HashMap<CacheEntry, CacheEntry>();
@@ -428,7 +432,7 @@ public class TestNamenodeRetryCache {
assertTrue(namesystem.hasRetryCache());
cacheSet = (LightWeightCache<CacheEntry, CacheEntry>) namesystem
.getRetryCache().getCacheSet();
- assertEquals(24, cacheSet.size());
+ assertEquals(25, cacheSet.size());
iter = cacheSet.iterator();
while (iter.hasNext()) {
CacheEntry entry = iter.next();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
index 066fd66..916893c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
@@ -163,7 +163,7 @@ public class TestRetryCacheWithHA {
FSNamesystem fsn0 = cluster.getNamesystem(0);
LightWeightCache<CacheEntry, CacheEntry> cacheSet =
(LightWeightCache<CacheEntry, CacheEntry>) fsn0.getRetryCache().getCacheSet();
- assertEquals(24, cacheSet.size());
+ assertEquals(25, cacheSet.size());
Map<CacheEntry, CacheEntry> oldEntries =
new HashMap<CacheEntry, CacheEntry>();
@@ -184,7 +184,7 @@ public class TestRetryCacheWithHA {
FSNamesystem fsn1 = cluster.getNamesystem(1);
cacheSet = (LightWeightCache<CacheEntry, CacheEntry>) fsn1
.getRetryCache().getCacheSet();
- assertEquals(24, cacheSet.size());
+ assertEquals(25, cacheSet.size());
iter = cacheSet.iterator();
while (iter.hasNext()) {
CacheEntry entry = iter.next();
@@ -438,7 +438,8 @@ public class TestRetryCacheWithHA {
@Override
void invoke() throws Exception {
- lbk = client.getNamenode().append(fileName, client.getClientName());
+ lbk = client.getNamenode().append(fileName, client.getClientName(),
+ new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND)));
}
// check if the inode of the file is under construction
@@ -701,7 +702,8 @@ public class TestRetryCacheWithHA {
final Path filePath = new Path(file);
DFSTestUtil.createFile(dfs, filePath, BlockSize, DataNodes, 0);
// append to the file and leave the last block under construction
- out = this.client.append(file, BlockSize, null, null);
+ out = this.client.append(file, BlockSize, EnumSet.of(CreateFlag.APPEND),
+ null, null);
byte[] appendContent = new byte[100];
new Random().nextBytes(appendContent);
out.write(appendContent);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2848db81/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored
index dce3f47..da8c190 100644
Binary files a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored and b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored differ